import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.json4s.ShortTypeHints import org.json4s.jackson.Serialization import org.json4s.jackson.JsonMethods._ import scala.util.parsing.json.JSON object Test01 { case class Person(name:String,age:Int) def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("test01").setMaster("local") val sc = new SparkContext(conf) /**SparkContext中textFile(),参数为 文件目录*/ val input = sc.textFile("E:\\test\\test01.txt") input.foreach(println) println("*******①*******") /**SparkContext中wholeTextFiles(),参数为 路径,返回值是pairRDD*/ val input2=sc.wholeTextFiles("E:\\test") input2.foreach(println) println("*******②*******") val input3=sc.textFile("E:\\test\\test03.json") val result3=input3.map(s=>JSON.parseFull(s)) result3.foreach( { r=>r match{ case Some(map:Map[String,Any])=>println(map) case None => println("parsing failed!") case other =>println("unknow data structure"+ other) } } ) println("*******④*******") /**使用json4s来解析JSON文件*/ implicit val formats=Serialization.formats(ShortTypeHints(List())) val input4=sc.textFile("E:\\test\\test03.json") input4.collect().foreach(x => { var c = parse(x).extract[Person] println(c.name +","+ c.age) }) println("*******⑤*******") val sqlContext=new SQLContext(sc) val df = sqlContext.read.json("E:\\test\\test03.json") df.show(false) df.printSchema() df.select("name").show(false) df.select("name","age").show(false) df.filter(df("age")>15).show(false) println("*******⑥*******") //可以用saveASTextFile(path)保存json文件 val datasave=input4.map{myrecord=> implicit val formats=DefaultFormats val jsonObj = parse(myrecord) jsonObj.extract[Person] } datasave.saveAsTextFile("E:\\test\\savejson") //会在路径E:\test下生成路径savejson并在里面生成part-0000文件; //但如果已经存在test/savejson就会报错 println("******⑦******") } }
Spark-文件操作读取保存
猜你喜欢
转载自blog.csdn.net/u012761191/article/details/80657525
今日推荐
周排行