producer示例代码
Producer producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); String messageStr = new String("Message_" + messageNo); producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
producer可以支持异步发送和同步发送两种方式
config.producerType match { case "sync" => case "async" => sync = false producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId, queue, eventHandler, config.queueBufferingMaxMs, config.batchNumMessages, config.clientId) producerSendThread.start() }
客户端代码调用send方法发送数据,对于sync方法直接调用eventHandler.handle方法同步发送请求。我们先来看异步发送的方法
/** * Sends the data, partitioned by key to the topic using either the * synchronous or the asynchronous producer * @param messages the producer data object that encapsulates the topic, key and message data */ def send(messages: KeyedMessage[K,V]*) { lock synchronized { if (hasShutdown.get) throw new ProducerClosedException recordStats(messages) sync match { case true => eventHandler.handle(messages) case false => asyncSend(messages) } } }
异步方法发送数据到queue
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) { for (message <- messages) { val added = config.queueEnqueueTimeoutMs match { case 0 => queue.offer(message) case _ => try { config.queueEnqueueTimeoutMs < 0 match { case true => queue.put(message) true case _ => queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS) } } catch { case e: InterruptedException => false } } if(!added) { producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark() producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark() throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) }else { trace("Added to send queue an event: " + message.toString) trace("Remaining queue size: " + queue.remainingCapacity) } } }
异步处理事件的线程ProducerSendThread,这个线程会从queue中获取事件,当事件的个数达到或者超时时,就调用 handler.handle(events)方法去处理事件
private def processEvents() { var lastSend = SystemTime.milliseconds var events = new ArrayBuffer[KeyedMessage[K,V]] var full: Boolean = false // drain the queue until you get a shutdown command Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)) .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach { currentQueueItem => val elapsed = (SystemTime.milliseconds - lastSend) // check if the queue time is reached. This happens when the poll method above returns after a timeout and // returns a null object val expired = currentQueueItem == null if(currentQueueItem != null) { trace("Dequeued item for topic %s, partition key: %s, data: %s" .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message)) events += currentQueueItem } // check if the batch size is reached full = events.size >= batchSize if(full || expired) { if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..") if(full) debug("Batch full. Sending..") // if either queue time has reached or batch size has reached, dispatch to event handler tryToHandle(events) lastSend = SystemTime.milliseconds events = new ArrayBuffer[KeyedMessage[K,V]] } } // send the last batch of events tryToHandle(events) if(queue.size > 0) throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue" .format(queue.size)) }
DefaultEventHandler的handle方法处理事件
def handle(events: Seq[KeyedMessage[K,V]]) { val serializedData = serialize(events) serializedData.foreach { keyed => val dataSize = keyed.message.payloadSize producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize) producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize) } var outstandingProduceRequests = serializedData var remainingRetries = config.messageSendMaxRetries + 1 val correlationIdStart = correlationId.get() debug("Handling %d events".format(events.size)) while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) //如果超时了,就更新topic的meta data if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds } //发送数据 outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) //如果失败了,更新失败的topic的metadata if (outstandingProduceRequests.size > 0) { info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.retryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() remainingRetries -= 1 producerStats.resendRate.mark() } } if(outstandingProduceRequests.size > 0) { producerStats.failedSendRate.mark() val correlationIdEnd = correlationId.get() error("Failed to send requests for topics %s with correlation ids in [%d,%d]" .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","), correlationIdStart, correlationIdEnd-1)) throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null) } }
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = { //将发送的消息映射成要发送到的broker,以及相应的message的map val partitionedDataOpt = partitionAndCollate(messages) partitionedDataOpt match { case Some(partitionedData) => val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] try { for ((brokerid, messagesPerBrokerMap) <- partitionedData) { if (logger.isTraceEnabled) messagesPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap) //使用java的nio发送 val failedTopicPartitions = send(brokerid, messageSetPerBroker) failedTopicPartitions.foreach(topicPartition => { messagesPerBrokerMap.get(topicPartition) match { case Some(data) => failedProduceRequests.appendAll(data) case None => // nothing } }) } } catch { case t: Throwable => error("Failed to send messages", t) } failedProduceRequests case None => // all produce requests failed messages } }