def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("BatchLoadThroughKafka")
//val conf: SparkConf = new SparkConf().setAppName("BatchLoadThroughKafka")
val carbon: SparkSession = SparkCarbonSessionSingleton.getInstance(conf, storePath)
// val carbon: SparkSession = SparkSession.builder.master("local[*]").getOrCreate()
val streaming = new StreamingContext(carbon.sparkContext, Seconds(10))
//定义消费主题topic
val topics = Set("carbon_file_topic")
/**
* PreferConsistent:将分区分布在所有的Excutor上,也就是每Executor都会消费Kafka数据
* PreferBrokers:只有当执行程序与Kafka代理程序位于相同的节点时,才可以使用。
* Subscribe:传入具体的kafka参数
*/
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
streaming,
PreferConsistent,
Subscribe[String, String](topics, getkafkaParams()))
kafkaStream.map(record => (record.key, record.value)).map(rdd => {
val path: String = rdd._2
if (StringUtils.isNotBlank(path)) {
try {
val tbl: String = getFile2Tbl(path)
println("tbl=============>" + tbl)
val bool: Boolean = getFileSystem.exists(new Path(getTablePath(tbl, null)))
println("bool=============>" + bool)
if (bool) {
load(carbon, path, tbl)
} else {
createAndLoad(carbon, path, tbl)
}
} catch {
case e: FileNameErrorExcepation =>
//将load失败的文件路径放入mysql,并记录异常信息,后续利用定时任务重新load一次
//RecordFailLoadFile.record(path, e, carbon)
e.printStackTrace()
case e: Exception =>
//将load失败的文件路径放入mysql,并记录异常信息,后续利用定时任务重新load一次
//RecordFailLoadFile.record(path, e, carbon)
e.printStackTrace()
}
}
}).print(1)
streaming.start()
streaming.awaitTermination()
}
def getkafkaParams(): Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> BOOTSTRAP_SERVERS, //指定Kafka的集群地址
"key.deserializer" -> classOf[StringDeserializer], //指定key的反序列化器
"value.deserializer" -> classOf[StringDeserializer], //指定值的反序列化器
"group.id" -> "use_a_separate_group_id_for_each_stream", //consumer的分组id
"auto.offset.reset" -> "latest", //从新定义消费者以后,不从头消费分区里的数据,只消费定义消费者以后的数据
"enable.auto.commit" -> (true: java.lang.Boolean) //是否自动提交offsets,也就是更新kafka里的offset,表示已经被消费过了
)
*解决方案:
一定按照官网给出的例子写,例如上边的关键的API为,只有用一下API才能消费到数据。
kafkaStream.map(record => (record.key, record.value))