在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。
def hasNext(): Boolean = { if(state == FAILED) //处于FAILED状态时,另外线程访问会直接异常 throw new IllegalStateException("Iterator is in failed state") state match { case DONE => false case READY => true case _ => maybeComputeNext() } } def maybeComputeNext(): Boolean = { state = FAILED //重置了状态 nextItem = Some(makeNext()) if(state == DONE) { false } else { state = READY true } } protected def makeNext(): MessageAndMetadata[K, V] = { var currentDataChunk: FetchedDataChunk = null // if we don't have an iterator, get one var localCurrent = current.get() if(localCurrent == null || !localCurrent.hasNext) { if (consumerTimeoutMs < 0) currentDataChunk = channel.take //channel是BlockingQueue这里会阻塞 else { currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) if (currentDataChunk == null) { // reset state to make the iterator re-iterable resetState() throw new ConsumerTimeoutException } } //省略部分代码 }
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example