package anyec; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.BrokerEndPoint; import kafka.common.ErrorMapping; import kafka.common.OffsetAndMetadata; import kafka.common.OffsetMetadata; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetCommitResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; /** * * @author anyec *@description kafka低级消费 */ public class KafkaSimpleConsumer { static Logger logger=LoggerFactory.getLogger(KafkaSimpleConsumer.class); private List<String> replicaBrokers = new ArrayList<String>(); public KafkaSimpleConsumer() { replicaBrokers = new ArrayList<String>(); } public static void main(String args[]) { KafkaSimpleConsumer example = new KafkaSimpleConsumer(); // 最大读取消息数量 long maxReads = Long.parseLong("3"); // 要订阅的topic String topic = "anytopic"; // 要查找的分区 int partition = Integer.parseInt("0"); // broker节点的ip List<String> seeds = new ArrayList<String>(); seeds.add("ldap.anyec.cn"); // 端口 int port = Integer.parseInt("9092"); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { logger.error("error:" + e); e.printStackTrace(); } } /** * * @param maxReads * @param topic * @param partition * @param seedBrokers * @param port * @throws Exception */ public void run(long maxReads, String topic, int partition, List<String> seedBrokers, int port) throws Exception { // 获取指定Topic partition的元数据 PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition); if (metadata == null) { logger.error("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { logger.error("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + topic + "_" + partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); long readOffset = getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); } FetchRequest req = new FetchRequestBuilder().clientId(clientName) .addFetch(topic, partition, readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; short code = fetchResponse.errorCode(topic, partition); logger.info("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { readOffset = getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, topic, partition, port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { logger.info("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); logger.info(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) consumer.close(); } /** * * @param consumer * @param topic * @param partition * @param whichTime * @param clientName * @return */ public static long getOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.api.OffsetRequest.EarliestTime(); kafka.api.OffsetRequest.LatestTime(); kafka.api.OffsetRequest.CurrentVersion(); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); } public static long getEarliestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName); } public static long getCurrentOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.CurrentVersion(), clientName); } /** * @param oldLeader * @param topic * @param partition * @param port * @return String * @throws Exception * @author anyec 找一个leader broker */ private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(replicaBrokers, port, topic, partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } logger.info("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { logger.error("Error communicating with Broker [" + seed + "] to find Leader for [" + topic + ", " + partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { replicaBrokers.clear(); for (BrokerEndPoint replica : returnMetaData.replicas()) { replicaBrokers.add(replica.host()); } } return returnMetaData; } public short saveOffsetInKafka(SimpleConsumer simpleConsumer, int partition, String topic, String kafkaClientId,long offset, short errorCode) throws Exception { short versionID = 0; int correlationId = 0; try { TopicAndPartition tp = new TopicAndPartition(topic, partition); OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(new OffsetMetadata(offset, OffsetMetadata.NoMetadata()), errorCode, errorCode); Map<TopicAndPartition, OffsetAndMetadata> mapForCommitOffset = new HashMap<>(); mapForCommitOffset.put(tp, offsetMetaAndErr); kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest(kafkaClientId,mapForCommitOffset, correlationId, kafkaClientId, versionID); OffsetCommitResponse offsetCommitResp = simpleConsumer.commitOffsets(offsetCommitReq); return (Short) offsetCommitResp.errors().get(tp); } catch (Exception e) { logger.error("Error when commiting Offset to Kafka: " + e.getMessage(), e); throw e; } } }
package anyec; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import kafka.admin.AdminUtils; //import kafka.admin.RackAwareMode; import kafka.utils.ZkUtils; import scala.collection.Map; public class KafkaAdminMain { static String topicName = "uuu88"; public static void main(String[] args) { createTopics(); sendMsg(topicName); receiveMsg(topicName); } public static void listTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (!exist) { AdminUtils.createTopic(zk, topicName, 1, 1, new Properties()); // AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$); } Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName); } public static ZkUtils getZkClient() { ZkConnection zkConnection = new ZkConnection("localhost:12181"); // zkConnection.connect(null); ZkClient zkClient = new ZkClient(zkConnection, 300 * 1000); ZkUtils zk = new ZkUtils(zkClient, zkConnection, false); return zk; } public static void createTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (!exist) { AdminUtils.createTopic(zk, topicName, 1, 1, new Properties()); // AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$); } Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName); } public static void deleteTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (exist) { AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zk, topicName); AdminUtils.deleteTopic(zk, topicName); } } public static Producer<String, String> getProducer() { Properties props = new Properties(); props.put("bootstrap.servers", "kafka.anyec.cn:9092"); props.put("acks", "0"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 1024*1024); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer(props); return producer; } public static Consumer<String, String> getConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", "kafka.anyec.cn:9092"); /*配置group id*/ props.put("group.id", "test"); /*配置自动提交位置*/ props.put("enable.auto.commit", "true"); /*配置自动提交的时间,以毫秒为单位*/ props.put("auto.commit.interval.ms", "1000"); /*配置session timeout时间,以毫秒为单位*/ props.put("session.timeout.ms", "30000"); /*这两个deserializer一般不要动,直接拿来用就行了*/ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 订阅主题列表topic // consumer.subscribe(Arrays.asList("test01","mytopic")); return consumer; } public static void sendMsg(String topic) { Producer<String, String> producer = getProducer(); // List list = producer.partitionsFor(topic); for (int i = 0; i < 100000; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,i%3+"", "v" + i); try { producer.send(record).get(100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } producer.flush(); producer.close(); } static Callback callback=new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if(e!=null){ System.out.println(e.getMessage()); } } }; public static void receiveMsg(String topic) { Consumer<String, String> consumer = getConsumer(); consumer.subscribe(Arrays.asList(topic)); ConsumerRecords<String, String> records= consumer.poll(10000); int count = records.count(); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); for(ConsumerRecord<String, String> record:records){ System.out.println(record.key()+" "+record.value()); } consumer.commitSync(); consumer.close(); } }
package anyec; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import kafka.admin.AdminUtils; //import kafka.admin.RackAwareMode; import kafka.utils.ZkUtils; import scala.collection.Map; public class KafkaAdminMain2 { static String topicName = "uuu88"; public static void main(String[] args) { // createTopics(); // sendMsg(topicName); receiveMsg(topicName); } public static void listTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (!exist) { AdminUtils.createTopic(zk, topicName, 1, 1, new Properties()); // AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$); } Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName); } public static ZkUtils getZkClient() { ZkConnection zkConnection = new ZkConnection("localhost:12181"); // zkConnection.connect(null); ZkClient zkClient = new ZkClient(zkConnection, 300 * 1000); ZkUtils zk = new ZkUtils(zkClient, zkConnection, false); return zk; } public static void createTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (!exist) { AdminUtils.createTopic(zk, topicName, 1, 1, new Properties()); // AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$); } Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName); } public static void deleteTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (exist) { AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zk, topicName); AdminUtils.deleteTopic(zk, topicName); } } public static Producer<String, String> getProducer() { Properties props = new Properties(); props.put("bootstrap.servers", "kafka.anyec.cn:9092"); props.put("acks", "all"); props.put("retries", 2); props.put("batch.size", 16384); props.put("linger.ms", 30); props.put("buffer.memory", 30*1024*1024); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer(props); return producer; } public static Consumer<String, String> getConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", "kafka.anyec.cn:9092"); /*配置group id*/ props.put("group.id", "test"); /*配置自动提交位置*/ props.put("enable.auto.commit", "true"); /*配置自动提交的时间,以毫秒为单位*/ props.put("auto.commit.interval.ms", "1000"); /*配置session timeout时间,以毫秒为单位*/ props.put("session.timeout.ms", "30000"); /*这两个deserializer一般不要动,直接拿来用就行了*/ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 订阅主题列表topic // consumer.subscribe(Arrays.asList("test01","mytopic")); return consumer; } public static void sendMsg(String topic) { Producer<String, String> producer = getProducer(); // List list = producer.partitionsFor(topic); for (int i = 0; i < 100000; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "v" + i); try { producer.send(record).get(100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } producer.flush(); producer.close(); } static Callback callback=new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if(e!=null){ System.out.println(e.getMessage()); } } }; public static void receiveMsg(String topic) { Consumer<String, String> consumer = getConsumer(); consumer.subscribe(Arrays.asList(topic)); while(true){ ConsumerRecords<String, String> records= consumer.poll(10000); int count = records.count(); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); for(ConsumerRecord<String, String> record:records){ System.out.println(record.key()+" "+record.value()); } consumer.commitSync(); } } }
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>anyec</groupId> <artifactId>anyec</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.1</version> </dependency> </dependencies> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>