hadoop1版本中提供了获取文件名的功能,就是在map阶段可以获取每一行记录属于哪个文件,可以得到这个文件名,代码如下:
Java代码
- //获取文件名
- InputSplit inputSplit=(InputSplit)context.getInputSplit();
- String filename=((FileSplit)inputSplit).getPath().getName();
Java代码
- object Mytest3 {
- def main(args: Array[String]): Unit = {
- val conf=new SparkConf
- conf.setMaster("local[2]").setAppName("mytest")
- System.setProperty("hadoop.home.dir","E:\\hadoop2\\hadoop-2.6.0")
- val sc=new SparkContext(conf)
- val fileRDD=sc.hadoopFile[LongWritable, Text, TextInputFormat]("C:\\sparksplit\\*")
- val hadoopRdd = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]
- val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit((inputSplit:InputSplit,iterator:Iterator[(LongWritable, Text)]) =>{
- val file = inputSplit.asInstanceOf[FileSplit]
- iterator.map(x=>{file.getPath.toString()+"\t"+x._2})
- }
- )
- fileAndLine.foreach(println)
- }
- }
Java代码
- import cn.wjpt.AuditLogManager
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
- import org.apache.hadoop.io.Text
- import org.apache.hadoop.io.LongWritable
- import org.apache.spark.rdd.HadoopRDD
- import org.apache.hadoop.mapred.InputSplit
- import org.apache.hadoop.mapred.FileSplit
- import org.apache.hadoop.mapred.TextInputFormat