package com.day_212
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* TODO
*
* @author 徐磊
* @email [email protected]
* @data2020/02/12 下午 07:48
* @最终需求效果:
*/
object One extends App {
val con = new SparkConf().setAppName("rddmysql").setMaster("local[2]")
val sc = new SparkContext(con)
//********************************直接从mysql中读取数据(并且加条件)
//1、获取数据库连接的函数
val getcon=()=>{
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://node132:3306/xss","root","root")
}
//2、创建读取RDD
val jdbcmysql=new JdbcRDD(
sc, //sc程序入口
getcon, // 数据库连接
"select * from student where id >=? and id <=?", //条件查询语句
2, //最小值
3, //最大值
2, //partition分区
ressultSet=>{//返回内容(获取内容) ressultSet是自己起的别名,可以随便起 id和age的下标是由它在你数据库中的位置决定的
val id =ressultSet.getInt(1)
val age =ressultSet.getInt(2)
(id,age) //返回
}
)
// ,然后再把age等于18的数据取出
val jdbcmysql2 = jdbcmysql.filter(x=>x._2==18)
//再把过滤后的数据保存到mysql中(自己在mysql中创建数据,有id和age)
println(jdbcmysql2.collect().toBuffer)
//*****************************将数据用RDD直接存到mysql中
jdbcmysql2.foreachPartition(x=>{
val con = DriverManager.getConnection("jdbc:mysql://node132:3306/xss","root","root") //建立连接
val eq = con.prepareStatement("insert into ssss values(?,?)") //添加sql语句。需要提前在mysql中建好结构相同的表
x.foreach(t=>{
eq.setInt(1,t._1)
eq.setInt(2,t._2)
eq.executeUpdate()
})
eq.close()
con.close()
})
sc.stop()
}
从spark直接从Mysql中读取数据形成RDD,然后计算完成存入mysql
猜你喜欢
转载自blog.csdn.net/qq_44472134/article/details/104319823
今日推荐
周排行