参考
http://hanyingjun318.iteye.com/blog/2277512
环境
idea;sbt;
hadoop
在hadoop中读取文件名
InputSplit inputSplit=(InputSplit)context.getInputSplit();
String filename=((FileSplit)inputSplit).getPath().getName();
spark
spark代码,在本地测试有效,mr有mapred和mapreduce两种
mapreduce
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.NewHadoopRDD
object testPath {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("testtoParquet")
conf.setMaster("local")
val sc = new SparkContext(conf)
var input = "/home/dwj/data/testSpark/20180409"
var output = ""
val fileRDD = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](input)
val hadoopRDD = fileRDD.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit:InputSplit, iterator:Iterator[(LongWritable, Text)]) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(x => {file.getPath.toString()+"\t" + x._2})
})
fileAdnLine.foreach(println)
}
}
mapred
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.HadoopRDD
object testPathOld {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("testtoParquet")
conf.setMaster("local")
val sc = new SparkContext(conf)
var input = "/home/dwj/data/testSpark/20180409"
val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)
val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]
val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit:InputSplit,iterator:Iterator[(LongWritable, Text)]) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(x => {file.getPath.toString()+"\t" + x._2})
})
fileAdnLine.foreach(println)
}
}