前言
Structured Streaming出来有几年了,一直没有机会使用,最近闲来无事,就想先测试一下,完全没有细看关于它的一些详细介绍情况,仅仅想根据官网案例,执行一遍,没想到…
copy官网的一小段代码
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
然后执行
报错1:
Structured Streaming Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
解决方法参考这里
大概原因:
缺少kafka相应的jar包
下载spark-sql-kafka-0-10_2.XXjar包
报错2:
ERROR streaming.MicroBatchExecution: Query [id = 9ee668b3-a4dc-482a-8414-ad9acce355b6, runId = 3debc773-22b5-48f7-b02d-23e13f131a62] termin
ated with error
解决方法参考这里
大概原因:
我在maven中的spark-sql-kafka-0-10_2.12版本中后面的2.12是scala的版本,而本地scala是2.11,因此这个依赖需要写成spark-sql-kafka-0-10_2.11,并且下载的jar包也应该于此对应
报错3:
exception in thread "main" org.apache.spark.sql.analysisexception: queries with streaming sources must be executed with writestream.start();
解决方法参考这里
大概原因:
就是需要调用writestream.start()方法才能执行structured streaming代码,类似于:
df.writeStream
.outputMode("update")
.format("console")
.start()
报错4:
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
解决方法参考这里
大概原因:
执行spark-submit之前需要先执行:
export SPARK_KAFKA_VERSION=0.10
不然默认是0.9版本,这个可以在CDH中配置
报错5:
ERROR streaming.MicroBatchExecution: Query [id = 701adf83-7b60-4093-8be0-3551d65f9313, runId = daa85c70-b30d-4dcb-99b2-94fbf70769c6] termin
ated with errorjava.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
解决方法参考这里
大概原因:
spark structured streaming代码中必须得有awaitAnyTermination()方法,于是加入了:
spark.streams.awaitAnyTermination()
后记
这里仅仅是一个简单测试,竟然出现了如此多的问题,最主要是没有搞懂原理以及如何使用就想依葫芦画瓢