spark streaming从kafka 2.11中拉取数据的错误:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2244)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2228)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1532)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1513)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
spark streaming编写的消费端代码如下:
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR) //配置日志
val conf = new SparkConf()
.setAppName("userFromKafka")
.setMaster("local[*]")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,localhost:9093,localhost:9094",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "yc74",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val topics = Array("topicUser", "topicB")
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams) //订阅一组主题,以获取消息
)
//根据接收的kafka信息,切分得到用户地址和用户电话的DStream,要用到filter操作进行过滤,对两种不同的消息分别处理
val nameAddressStream = stream.map(_.value).filter(record => {
val items = record.split("\t")
items(2).toInt == 0 //判断这条信息的第三位即 type 是否为0,如为0,则表示为 用户地址的信息
}).map(record => {
val items = record.split("\t")
(items(0), items(1)) //只返回 name, addr
})
//切分电话信息
val namePhoneStream = stream.map(_.value).filter(record => {
val items = record.split("\t")
items(2).toInt == 1
}).map(record => {
val items = record.split("\t")
(items(0), items(1))
})
//调用join合并上面两个DStream (name, (addr,phone) )
val result = nameAddressStream.join(namePhoneStream).map(record => {
s"姓名:${record._1},地址:${record._2._1},电话:${record._2._2}"
})
//输出
result.print()
ssc.start()
//优雅退出
ssc.awaitTermination()
}
问题分析: 以上是因为对同一个rdd的多次filter,再聚合的操作时报错.
这里的两个rdd读取的是同一份数据,当执行action时,都会触发两次数据的读操作,(rdd中的一个分区对应着topic中的一个分区,也就是说kafka中的一个分区的数据这里被读取了2次) 但是,同一个分区的数据只能被一个consumer消费,所以这里报错。
解决方案:一个可行的解决方案是对rdd进行缓存或者checkpoint,然后要能保证,原始的kafka中的数据,只会被消费一次,然后剩下的数据消费都从缓存中获取数据。
val rdds = stream.cache()