import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object ForeachRDDApp { def main(args: Array[String]) { val sparkConf = new SparkConf() .setAppName("ForeachRDDApp") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(10)) val lines = ssc.socketTextStream("hadoop000",9997) val results = lines.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_) // TODO... 将results写入到MySQL中 // results.foreachRDD(rdd => { // rdd.foreach(x => { // val connection = createConnection() // val word = x._1 // val count = x._2.toInt // val sql = s"insert into wc(word, c) values ('$word', $count)" // connection.createStatement().execute(sql) // }) // }) // 最佳实践 results.foreachRDD(rdd => { rdd.foreachPartition(partition => { val connection = createConnection() partition.foreach(x => { val word = x._1 val count = x._2.toInt val sql = s"insert into wc(word, c) values ('$word', $count)" connection.createStatement().execute(sql) }) connection.close() }) rdd.foreach(x => { val connection = createConnection() val word = x._1 val count = x._2.toInt val sql = s"insert into wc(word, c) values ('$word', $count)" connection.createStatement().execute(sql) }) }) ssc.start() // 一定要写 // lines.print() ssc.awaitTermination() } def createConnection() = { Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://hadoop000:3306/ss2","root","root") } }
SparkStreaming - 写入到mysql ForeachRdd
猜你喜欢
转载自blog.csdn.net/qq_15300683/article/details/80689998
今日推荐
周排行