版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_29726869/article/details/82460829
package com.rlt.dao import java.io.{File, PrintWriter} import java.util.Date import com.rlt.utils.{FileUtils, HdfsUtil, MyDataTime, PropertiesUtils} import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.hadoop.fs.FileUtil import org.apache.spark.SparkConf object LoadingDict { def main(arr:Array[String]):Unit={ var start_time =new Date().getTime //读取配置文件 var properties = PropertiesUtils.getProperties("spark-conf.properties") var sparkMaster = properties.get("spark.master") var sparkMemory = properties.get("spark.executor.memory") var sparkDriverMemory = properties.get("spark.driver.memory") var sparkCores = properties.get("spark.executor.cores") var corenums = properties.get("spark.cores.max") var dictPath = properties.get("dictPath") var savePath = properties.get("savePath") var hdfsPath = properties.get("hdfsPath") var parallelism = properties.get("spark.default.parallelism") var executorNums = properties.get("spark.num.executors") System.setProperty("user.name", "root"); System.setProperty("HADOOP_USER_NAME", "root"); //配置spark启动 val conf = new SparkConf() .setAppName("LoadingDict") .setMaster(sparkMaster.toString) .set("spark.executor.memory",sparkMemory.toString) .set("spark.driver.memory",sparkDriverMemory.toString) .set("spark.executor.cores",sparkCores.toString) .set("spark.cores.max",corenums.toString) .set("spark.default.parallelism",parallelism.toString) .set("spark.num.executors",executorNums.toString) .setJars(List("out/artifacts/UserPortrait_jar/UserPortrait.jar")) //开启spark连接 val spark = SparkSession.builder().config(conf).getOrCreate() println("开启spark任务") //RDD隐式转换 import spark.implicits._ //获取hdfs上词库文件夹,获取文件列表 var hdfsUtil = new HdfsUtil() var seq = hdfsUtil.getFileList(hdfsPath.toString+"/sougoudict") //删除文件夹 HdfsUtil.deleteDir(hdfsPath.toString+"/sougoudict2") HdfsUtil.deleteDir(savePath.toString) var rdd:RDD[(String,String)] = null; //遍历文件列表,读入RDD for(dict<-seq.toArray()){ var file = dict.toString println(file) try { var path = file.substring(0,file.length() - 4) var arr = path.split("/") var ejfl = arr(arr.length - 1) //一级分类 var yjfl = arr(arr.length - 2) //二级分类 //分割RDD,只保留词汇,不保留拼音,去重 rdd = spark.sparkContext.textFile(file) .filter(line => line != "") .map { line => var str = line.split(' ') (str(1), yjfl + "-" + ejfl) }.distinct() // rdd.foreach(println) //RDD存入hdfs文件 rdd.saveAsTextFile(hdfsPath.toString+"/sougoudict2/" + yjfl + "-" + ejfl) } catch { case e:Exception => println(e.getMessage) } } //将所有处理后的词库读入一个RDD对象 var result_rdd = spark.sparkContext.textFile(hdfsPath.toString+"/sougoudict2/*").map{line=> var str = line.substring(1,line.length-1).split(',') (str(0),str(1)) }.distinct().reduceByKey((a,b)=>a+" "+b) //将结果词库存入hdfs系统 result_rdd.saveAsTextFile(savePath.toString) var end_time =new Date().getTime println("耗时"+(end_time-start_time)/1000+"秒") spark.stop() spark.close() println("结束spark任务") } }