//入库 object Spark2 { def main(args: Array[String]): Unit = { val conf=new SparkConf() conf.setMaster("local") conf.setAppName("hu") // 创建Spark上下文对象 val sc=new SparkContext(conf) // 创建JDBCRDD,方法数据库 val url = "jdbc:mysql://hadoop01:3306/spark?characterEncoding=utf-8" val username = "root" val pass = "123456" val list=List((1,"张三",20),(2,"李四",30),(3,"王五",40)) val rdd: RDD[(Int, String, Int)] = sc.makeRDD(list) rdd.foreachPartition(data => { val connection = java.sql.DriverManager.getConnection(url, username, pass) data.foreach { case (id, name, age) => { val prep = connection.prepareStatement("create table if not exists user(id int auto_increment,name varchar(10),age int,primary key(id)) charset='utf8'") val prep2= connection.prepareStatement("insert into user (id,name, age) values (?, ?, ?)") prep.execute() prep2.setInt(1,id) prep2.setString(2,name) prep2.setInt(3,age) prep2.executeUpdate()
prep.close() prep2.close() } } connection.close() }) } }
//出库 object JDBCRDD{ def main(args: Array[String]): Unit = { val conf=new SparkConf() conf.setMaster("local") conf.setAppName("hu") // 创建JDBCRDD,方法数据库 val sc=new SparkContext(conf) //连接数据库jdbc val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop01:3306/spark" val uname = "root" val pass = "123456" // 查询数据 val sql = "select name,age from user where id >=? and id <=?" // val sql = "select name ,age from user " // 查询数据要带范围,要不会报错 val jdbcRDD: JdbcRDD[Unit] = new JdbcRDD( // sc: SparkContext, //上下文 // getConnection: () => Connection, //连接数据库 // sql: String, //sql语句 // lowerBound: Long, //下限 // upperBound: Long, //上限 // numPartitions: Int, //分区数 // mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _ sc, () => { Class.forName(driver) java.sql.DriverManager.getConnection(url, uname, pass) }, sql, 1, //下限 3, //上限 2, //分区数 (rs) => { println(rs.getString(1) + " , " + rs.getString(2)) //数字是列索引 }) jdbcRDD.collect }} |