首先确定: 自己的idea开发环境和spark集群的开发环境一致,jdk, scala 版本都一直,而且在idea中测试可以消费到kafka集群中的数据:
集群 spark为1.6.1,本地idea也是spark1.6依赖。并且将sparkstream2kafka程序发布到spark集群也是正常运行,但是获取不到kafka中的数据。后来经过自己大量的测试终于找到了答案(百度了好久没有):
代码:
package spark_api.kafka import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import spark_api.kafka.datamodel.TestModel import spark_api.kafka.serializer.{StringDecoder, TestModelSerializer} object SparkStreaming2kafkaConsumerCustomTestModeByCreateDirectStream { var status: Boolean = false def main(args: Array[String]): Unit = { var Array(a,b) = args val conf = new SparkConf().setAppName(a).setMaster(b) val scc: StreamingContext = new StreamingContext(conf,Seconds(2)) val sc = scc.sparkContext sc.hadoopConfiguration.addResource("core-site.xml") sc.hadoopConfiguration.addResource("hdfs-site.xml") val kafkaParam = Map("zookeeper.connect"->"node1:2181,node2:2181,node3:2181,node4:2181","group.id"->a,"zookeeper.connection.timeout.ms" -> "100000" ,"metadata.broker.list"->"node1:9092,node2:9092,node3:9092,node4:9092") val topics=Set("TestModel") val kafkaRdd = KafkaUtils.createDirectStream[String,TestModel,StringDecoder,TestModelSerializer](scc,kafkaParam,topics) kafkaRdd.foreachRDD(rdd=>{ println("准备解析数据。。。") consumer2KakfaJQ(rdd) //此方法在 本地 和spark集群都可以正常获得kafka数据, //consumerKafka(rdd)//在 本地 可以获得kafka数据,在spark集群则无效 }) kafkaRdd.start() scc.start() scc.awaitTermination() } private def consumer2KakfaJQ(rdd: RDD[(String, TestModel)]) = { val len = rdd.collect().length println("数据为: " + len) if (len > 0) { rdd.collect().foreach(yz => { //集群可以拿取kafka数据 println("查看数据" + yz._1 + " : " + yz._2) }) }// } private def consumerKafka(rdd: RDD[(String, TestModel)]) = { rdd.foreach(yz => { //集群拿不到kafka数据 println("开始解析数据:数据为: " + yz._2.toString); status = true println("当前线程ID: " + Thread.currentThread().getId) } ) // println("当前线程ID: " + Thread.currentThread().getId) println(status) if (status) { println("数据解析完毕。。。") status = false } else { println("当前获取不到数据!!!") } } }
总结:原来是rdd.foreach 导致在spark集群获取不到kafka中的数据,需要换成 rdd.collect().foreach,就可以获得数据,具体原因上明确,还请大神告知。