这几天有人问关于怎么把不同的topic的数据写入到hive的不同的表里,我写了一个简单的demo,大家可以参考一下,
package hive
import java.io.File
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
/**
* spark消费多个topic的数据写入不同的hive表
*/
object SparkToHive {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN)
val warehouseLocation = new File("hdfs://cluster/hive/warehouse").g