在Spark生态体系中,SparkCore,SparkSql,SparkStream都有自己的输入输出,他们之间可以相互转换。相对来说。SparkCore的输入输出做的不是特别规范,SparkSql的输入输出做的比较规范,下面我们来学习的是SparkCore的输入输出。
1.文本文件输入输出
当我们将一个文本文件读取为 RDD 时,输入的每一行 都会成为RDD的一个元素。也可以将多个完整的文本文件一次性读取为一个pair RDD, 其中键是文件名,值是文件内容。
val input = sc.textFile("./README.md")
如果传递目录,则将目录下的所有文件读取作为RDD。
文件路径支持通配符。
通过wholeTextFiles()对于大量的小文件读取效率比较高,大文件效果没有那么高。
Spark通过saveAsTextFile() 进行文本文件的输出,该方法接收一个路径,并将 RDD 中的内容都输入到路径对应的文件中。Spark 将传入的路径作为目录对待,会在那个 目录下 输出多个文件。这样,Spark 就可以从多个节点上并行输出了。
result.saveAsTextFile(outputFile)
scala> sc.textFile("./README.md")
res6: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[7] at textFile at <console>:25
scala> val readme = sc.textFile("./README.md")
readme: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[9] at textFile at <console>:24
scala> readme.collect()
res7: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster...
scala> readme.saveAsTextFile("hdfs://master01:9000/test")
2.JSON文件输入输出
如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
scala> import org.json4s._
import org.json4s._
scala> import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.JsonMethods._
scala> import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization
scala> var result = sc.textFile("examples/src/main/resources/people.json")
result: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.json MapPartitionsRDD[7] at textFile at <console>:47
scala> implicit val formats = Serialization.formats(ShortTypeHints(List()))
formats: org.json4s.Formats{val dateFormat: org.json4s.DateFormat; val typeHints: org.json4s.TypeHints} = org.json4s.Serialization$$anon$1@61f2c1da
scala> result.collect().foreach(x => {var c = parse(x).extract[Person];println(c.name + "," + c.age)})
Michael,30
Andy,30
Justin,19
如果JSON数据是跨行的,那么只能读入整个文件,然后对每个文件进行解析。
JSON数据的输出主要是通过在输出之前将由结构 化数据组成的 RDD 转为字符串 RDD,然后使用 Spark 的文本文件 API 写出去。
说白了还是以文本文件的形式存,只是文本的格式已经在程序中转换为JSON。
3.CSV文件输入输出
读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,然后通过将每一行进行解析实现对CSV的读取。
CSV/TSV数据的输出也是需要将结构化RDD通过相关的库转换成字符串RDD,然后使用 Spark 的文本文件 API 写出去。
4.SequenceFile文件输入输出
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。
Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile[ keyClass, valueClass](path)。
scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24
scala> data.saveAsSequenceFile("hdfs://master01:9000/sequdata")
scala> val sdata = sc.sequenceFile[Int,String]("hdfs://master01:9000/sdata/p*")
sdata: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[19] at sequenceFile at <console>:24
scala> sdata.collect()
res14: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))
可以直接调用 saveAsSequenceFile(path) 保存你的PairRDD,它会帮你写出数据。需要键和值能够自动转为Writable类型。
5.对象文件输入输出
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。
scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> data.saveAsObjectFile("hdfs://master01:9000/objfile")
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> val objrdd:RDD[(Int,String)] = sc.objectFile[(Int,String)]("hdfs://master01:9000/objfile/p*")
objrdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[28] at objectFile at <console>:25
scala> objrdd.collect()
res20: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))
6.Hadoop输入输出格式
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数.
• 输入格式(InputFormat): 制定数据输入的类型,如TextInputFormat等,新旧两个版本所引用的版本分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
• 键类型: 指定[K,V]键值对中K的类型
• 值类型: 指定[K,V]键值对中V的类型
• 分区值: 指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits
其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.例如,对于textFile而言,只有path这个指定文件路径的参数,其他参数在系统内部指定了默认值
7.文件系统的输入输出
Spark 支持读写很多种文件系统, 像本地文件系统、Amazon S3、HDFS等。
8.数据库的输入输出
关系型数据库连接
支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,示例如下:
8.1 Mysql读取与写入
8.1.1 Mysql读取:
def main (args: Array[String] ) {
val sparkConf = new SparkConf ().setMaster ("local[2]").setAppName ("JdbcApp")
val sc = new SparkContext (sparkConf)
val rdd = new org.apache.spark.rdd.JdbcRDD (
sc,
() => {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
java.sql.DriverManager.getConnection ("jdbc:mysql://master01:3306/rdd", "root", "hive")
},
"select * from rddtable where id >= ? and id <= ?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2)))
println (rdd.count () )
rdd.foreach (println (_) )
sc.stop ()
}
8.1.2 Mysql写入:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
val data = sc.parallelize(List("Female", "Male","Female"))
data.foreachPartition(insertData)
}
def insertData(iterator: Iterator[String]): Unit = {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://master01:3306/rdd", "root", "hive")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
ps.setString(1, data)
ps.executeUpdate()
})
}
JdbcRDD 接收这样几个参数。
• 首先,要提供一个用于对数据库创建连接的函数。这个函数让每个节点在连接必要的配 置后创建自己读取数据的连接。
• 接下来,要提供一个可以读取一定范围内数据的查询,以及查询参数中lowerBound和 upperBound 的值。这些参数可以让 Spark 在不同机器上查询不同范围的数据,这样就不 会因尝试在一个节点上读取所有数据而遭遇性能瓶颈。
• 这个函数的最后一个参数是一个可以将输出结果从转为对操作数据有用的格式的函数。如果这个参数空缺,Spark会自动将每行结果转为一个对象数组。
8.2 HBase数据库
由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。这个输入格式会返回键值对数据,其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为org.apache.hadoop.hbase.client.Result。
8.2.1 HBase读取:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
//HBase中的表名
conf.set(TableInputFormat.INPUT_TABLE, "fruit")
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = hBaseRDD.count()
println("hBaseRDD RDD Count:"+ count)
hBaseRDD.cache()
hBaseRDD.foreach {
case (_, result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
val color = Bytes.toString(result.getValue("info".getBytes, "color".getBytes))
println("Row key:" + key + " Name:" + name + " Color:" + color)
}
sc.stop()
}
8.2.2 HBase写入:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
val fruitTable = TableName.valueOf("fruit_spark")
val tableDescr = new HTableDescriptor(fruitTable)
tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
val admin = new HBaseAdmin(conf)
if (admin.tableExists(fruitTable)) {
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
}
admin.createTable(tableDescr)
def convert(triple: (Int, String, Int)) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
}
val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
val localData = initialRDD.map(convert)
localData.saveAsHadoopDataset(jobConf)
}