- package sparkstreamday01.SparkStream
- import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- import org.apache.spark.streaming.dstream.ReceiverInputDStream
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- object StreamRddsql3 {
- //设置日志级别WARN
- Logger.getLogger("org").setLevel(Level.WARN)
- def main(args: Array[String]): Unit = {
- //本地运行设置
- val conf: SparkConf = new SparkConf()
- .setMaster("local[3]") //至少两个核(运行一个,接收一个)
- .setAppName(this.getClass.getSimpleName)
- //StreamingContext
- val ssc = new StreamingContext(conf, Seconds(4))
- //接受socket信息 创建Dstream
- val text: ReceiverInputDStream[String] = ssc.socketTextStream("Linux00", 9999)
- //转换成RDD
- text.foreachRDD(rdd => {
- val result: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
- result.foreach(println)
- //写入数据库
- result.foreachPartition(p => {
- val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/wjy1?characterEncoding=utf-8", "wjy1", "351991")
- val statement: PreparedStatement = conn.prepareStatement("create table if not exists wordcount3 (word varchar(20),total int)")
- statement.executeUpdate()
- //创建连接
- p.foreach(t => {
- //如果插入的单词已经存在数据库中,则更新这个单词的值(当前出现的次数+数据库当前的次数)
- val statement1: PreparedStatement = conn.prepareStatement("select * from wordcount3 where word=?")
- statement1.setString(1, t._1)
- val set: ResultSet = statement1.executeQuery()
- if (set.next()) {
- val count: Int = set.getInt("total")
- val newcount: Int = count + t._2
- //更新
- val statement2: PreparedStatement = conn.prepareStatement("update wordcount3 set total=? where word=?")
- statement2.setInt(1,newcount)
- statement2.setString(2,t._1)
- statement2.executeUpdate()
- }else{
- val statement3: PreparedStatement = conn.prepareStatement("insert into wordcount3 values(?,?)")
- statement3.setString(1,t._1)
- statement3.setInt(2,t._2)
- statement3.executeUpdate()
- }
- })
- conn.close()
- })
- })
- //启动主程序
- ssc.start()
- //阻塞 等待程序被关闭
- ssc.awaitTermination()
- }
- }
把SparkStreaming版本的wordcount写入mysql数据库
猜你喜欢
转载自blog.csdn.net/weixin_40155674/article/details/80708052
今日推荐
周排行