读取MySQL的数据
import java.util.Properties
import org.apache.spark.sql.SparkSession
object sparkRead {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[2]").appName("sparkjdbc").getOrCreate()
val context = sparkSession.sparkContext
context.setLogLevel("WARN")
val url = "jdbc:mysql://localhost:3306/test1"
val table ="student"
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","root")
val dataFrame = sparkSession.read.jdbc(url,table,properties)
dataFrame.show()
context.stop()
sparkSession.close()
}
}
将数据写入mysql
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
case class employee(id: Int, name: String, age: Int)
object sparkWrite {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[2]").appName("sparkjdbc").getOrCreate()
val context = sparkSession.sparkContext
context.setLogLevel("WARN")
val file: RDD[String] = context.textFile("file:///C:\\Users\\Administrator\\Documents\\tt\\person.txt")
val map: RDD[Array[String]] = file.map(_.split(" "))
val personRDD: RDD[employee] = map.map(x => employee(x(0).toInt, x(1), x(2).toInt))
import sparkSession.implicits._
val personDF: DataFrame = personRDD.toDF()
//将personDF注册成表
personDF.registerTempTable("person")
val personDatas: DataFrame = sparkSession.sql("select * from person")
val url = "jdbc:mysql://localhost:3306/test1"
val table ="person"
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","root")
//将数据保存到数据库
personDatas.write.mode(SaveMode.Overwrite).jdbc(url,table,properties)
context.stop()
sparkSession.close()
}
}