Consumer的使用示例代码
//创建soncumer connector ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig()); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //设置topic和监控份数的映射 topicCountMap.put(topic, new Integer(1)); //创建kafkaStream,一个topic可以对应多个kafkaStream。kafkaStream的份数和上面配置的监控份数相同 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); //循环遍历消息 while(it.hasNext()) //处理消息 System.out.println(new String(it.next().message()));
创建ConsumerConnector
/** * Create a ConsumerConnector * * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper * connection string zookeeper.connect. */ def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = { val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config) consumerConnect }
ZookeeperConsumerConnector调用consume方法
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") //获取topic consumer线程 val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList val dirs = new ZKGroupDirs(config.groupId) //注册consumer信息到zk registerConsumerInZK(dirs, consumerIdString, topicCount) //初始化consumer reinitializeConsumer(topicCount, queuesAndStreams) loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }
private def reinitializeConsumer[K,V]( topicCount: TopicCount, queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) { val dirs = new ZKGroupDirs(config.groupId) // listener to consumer and partition changes if (loadBalancerListener == null) { val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] loadBalancerListener = new ZKRebalancerListener( config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) } // create listener for session expired event if not exist yet if (sessionExpirationListener == null) sessionExpirationListener = new ZKSessionExpireListener( dirs, consumerIdString, topicCount, loadBalancerListener) // create listener for topic partition change event if not exist yet if (topicPartitionChangeListener == null) topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener) val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams // map of {topic -> Set(thread-1, thread-2, ...)} val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] = topicCount.getConsumerThreadIdsPerTopic val allQueuesAndStreams = topicCount match { case wildTopicCount: WildcardTopicCount => /* * Wild-card consumption streams share the same queues, so we need to * duplicate the list for the subsequent zip operation. */ (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList case statTopicCount: StaticTopicCount => queuesAndStreams } val topicThreadIds = consumerThreadIdsPerTopic.map { case(topic, threadIds) => threadIds.map((topic, _)) }.flatten //判断thread ids和queue stream的大小是否一样 require(topicThreadIds.size == allQueuesAndStreams.size, "Mismatch between thread ID count (%d) and queue count (%d)" .format(topicThreadIds.size, allQueuesAndStreams.size)) val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams) threadQueueStreamPairs.foreach(e => { val topicThreadId = e._1 val q = e._2._1 topicThreadIdAndQueues.put(topicThreadId, q) debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) newGauge( "FetchQueueSize", new Gauge[Int] { def value = q.size }, Map("clientId" -> config.clientId, "topic" -> topicThreadId._1, "threadId" -> topicThreadId._2.threadId.toString) ) }) val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) groupedByTopic.foreach(e => { val topic = e._1 val streams = e._2.map(_._2._2).toList topicStreamsMap += (topic -> streams) debug("adding topic %s and %d streams to map.".format(topic, streams.size)) }) // listener to consumer and partition changes zkClient.subscribeStateChanges(sessionExpirationListener) zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) } // explicitly trigger load balancing for this consumer loadBalancerListener.syncedRebalance() }
def syncedRebalance() { rebalanceLock synchronized { rebalanceTimer.time { if(isShuttingDown.get()) { return } else { for (i <- 0 until config.rebalanceMaxRetries) { info("begin rebalancing consumer " + consumerIdString + " try #" + i) var done = false var cluster: Cluster = null try { cluster = getCluster(zkClient) done = rebalance(cluster) } catch { case e: Throwable => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get * the value of a child. Just let this go since another rebalance will be triggered. **/ info("exception during rebalance ", e) } info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) { return } else { /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should * clear the cache */ info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") } // stop all fetchers and clear all the queues to avoid data duplication closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) Thread.sleep(config.rebalanceBackoffMs) } } } } throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries") }
private def rebalance(cluster: Cluster): Boolean = { val myTopicThreadIdsMap = TopicCount.constructTopicCount( group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic val brokers = getAllBrokersInCluster(zkClient) if (brokers.size == 0) { // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers // are up. warn("no brokers found when trying to rebalance.") zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener) true } else { /** * fetchers must be stopped to avoid data duplication, since if the current * rebalancing attempt fails, the partitions that are released could be owned by another consumer. * But if we don't stop the fetchers first, this consumer would continue returning data for released * partitions in parallel. So, not stopping the fetchers leads to duplicate data. */ closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) releasePartitionOwnership(topicRegistry) val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) //分配partition到工作线程 val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) // fetch current offsets for all topic-partitions val topicPartitions = partitionOwnershipDecision.keySet.toSeq val offsetFetchResponseOpt = fetchOffsets(topicPartitions) if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) false else { val offsetFetchResponse = offsetFetchResponseOpt.get topicPartitions.foreach(topicAndPartition => { val (topic, partition) = topicAndPartition.asTuple val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset val threadId = partitionOwnershipDecision(topicAndPartition) addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) }) /** * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt * A rebalancing attempt is completed successfully only after the fetchers have been started correctly */ if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } .foreach { case (topic, partitionThreadPairs) => newGauge("OwnedPartitionsCount", new Gauge[Int] { def value() = partitionThreadPairs.size }, ownedPartitionsCountMetricTags(topic)) } topicRegistry = currentTopicRegistry updateFetcher(cluster) true } else { false } } } }
RangeAssignor分配partition到工作线程
def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { val curConsumers = ctx.consumersForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) val nPartsPerConsumer = curPartitions.size / curConsumers.size val nConsumersWithExtraPart = curPartitions.size % curConsumers.size info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers) for (consumerThreadId <- consumerThreadIdSet) { val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) /** * Range-partition the sorted partitions to consumers for better locality. * The first few consumers pick up an extra partition, if any. */ if (nParts <= 0) warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) else { for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) // record the partition ownership decision partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) } } } }
当计算好当前consumer应该需要处理的partition之后,调用updateFetcher函数更新fetcher线程
private def updateFetcher(cluster: Cluster) { // update partitions for fetcher var allPartitionInfos : List[PartitionTopicInfo] = Nil for (partitionInfos <- topicRegistry.values) for (partition <- partitionInfos.values) allPartitionInfos ::= partition info("Consumer " + consumerIdString + " selected partitions : " + allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(",")) fetcher match { case Some(f) => f.startConnections(allPartitionInfos, cluster) case None => } }
再startConnections方法中会调用addFetcherForPartitions方法,用于启动fetcher线程
def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { var fetcherThread: AbstractFetcherThread = null fetcherThreadMap.get(brokerAndFetcherId) match { case Some(f) => fetcherThread = f case None => fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) fetcherThread.start } fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => topicAndPartition -> brokerAndInitOffset.initOffset }) } } info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) => "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) }
然后我们看ConsumerFetcherThread中的processPartitionData方法,这个方法中将获得的数据插入到queue里面。后面stream就可以处理相应的数据了
// process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { val pti = partitionMap(topicAndPartition) if (pti.getFetchOffset != fetchOffset) throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) }