1. 代码:
package spark.examples.fileformat import org.apache.spark.{SparkConf, SparkContext} object SequenceFileTest { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("SequenceFileTest") conf.setMaster("local[3]") val sc = new SparkContext(conf) val data = List(("ABC", 1), ("BCD", 2), ("CDE", 3), ("DEF", 4), ("FGH", 5)) val rdd = sc.parallelize(data, 1) val dir = "file:///D:/sequenceFile-" + System.currentTimeMillis() rdd.saveAsSequenceFile(dir) val rdd2 = sc.sequenceFile[String, Int](dir + "/part-00000") println(rdd2.collect().map(elem => (elem._1 + ", " + elem._2)).toList) } }
2. SequenceFile的内容:
3.注意:
saveAsSequenceFile是SequenceFileRDDFunctions定义的方法,但是在上面的代码中并没有显式的指定隐式转换,原因是上面的代码运行于Spark1.3中,在SparkContext中有如下的注释解释了这种行为
// The following implicit functions were in SparkContext before 1.3 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find // them automatically. However, we still keep the old functions in SparkContext for backward // compatibility and forward to the following functions directly. implicit def intWritableConverter(): WritableConverter[Int] = simpleWritableConverter[Int, IntWritable](_.get) implicit def longWritableConverter(): WritableConverter[Long] = simpleWritableConverter[Long, LongWritable](_.get) implicit def doubleWritableConverter(): WritableConverter[Double] = simpleWritableConverter[Double, DoubleWritable](_.get) implicit def floatWritableConverter(): WritableConverter[Float] = simpleWritableConverter[Float, FloatWritable](_.get) implicit def booleanWritableConverter(): WritableConverter[Boolean] = simpleWritableConverter[Boolean, BooleanWritable](_.get) implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { simpleWritableConverter[Array[Byte], BytesWritable] { bw => // getBytes method returns array which is longer then data to be returned Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) } } implicit def stringWritableConverter(): WritableConverter[String] = simpleWritableConverter[String, Text](_.toString) implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] = new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) }
而SequenceFileRDDFunctions是针对KV都是继承自Writable的PairRDD
/** * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile, * through an implicit conversion. Note that this can't be part of PairRDDFunctions because * we need more implicit parameters to convert our keys and values to Writable. * */ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag]( self: RDD[(K, V)], _keyWritableClass: Class[_ <: Writable], _valueWritableClass: Class[_ <: Writable]) extends Logging with Serializable {