首先wordcount代码,其中uaa是存放hadoop工具类的package!
package com.lxw.test; import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import uaa.HdfsOperation; public class WordCount2 { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("wordcount"); conf.setMaster("spark://tarzan:7077"); HdfsOperation hdfs=new HdfsOperation(); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("hdfs://tarzan:9000/Hadoop/Input/wordcount.txt"); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> wordCount) throws Exception { String a = wordCount._1 + "------" + wordCount._2 + "\n"; HdfsOperation.init(); HdfsOperation.appendHDFSFile("/test/test.txt", a); } }); sc.close(); } } |
package uaa; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.net.URI; import java.net.URISyntaxException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; /** * Created by Administrator on 2017/5/25. */ public class HdfsOperation { public static void main(String[] args) throws Exception { init(); // appendHDFSFile("/test/test.txt", "123"); mkfile("/test/test.txt"); } private static FileSystem hdfs; /** * @return 得到hdfs的连接 FileSystem类 * @throws URISyntaxException * @throws IOException * @throws InterruptedException */ public static void init() throws URISyntaxException, IOException, InterruptedException { // 获取FileSystem类的方法有很多种,这里只写一种 Configuration config = new Configuration(); URI uri = new URI("hdfs://tarzan:9000"); hdfs = FileSystem.get(uri, config, "root");// 第一位为uri,第二位为config,第三位是登录的用户 } public static void appendHDFSFile(String newFile, String content) throws IOException { if (StringUtils.isBlank(newFile) || null == content) { return ; } InputStream in = null; OutputStream out = null; try{ in = new BufferedInputStream(new ByteArrayInputStream(content.getBytes())); out = hdfs.append(new Path(newFile)); IOUtils.copy(in, out); out.close(); hdfs.close(); in.close(); }catch(Exception e){ }finally{ IOUtils.closeQuietly(in); IOUtils.closeQuietly(out); } } /** * 检查文件或者文件夹是否存在 * * @param filename * @return */ public static boolean checkFileExist(String filename) { try { Path f = new Path(filename); return hdfs.exists(f); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 创建文件夹 * * @param dirName * @return */ public static boolean mkdir(String dirName) { if (checkFileExist(dirName)) return true; try { Path f = new Path(dirName); System.out.println("Create and Write :" + f.getName() + " to hdfs"); return hdfs.mkdirs(f); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 创建一个空文件 * * @param filePath * 文件的完整路径名称 * @return */ public static boolean mkfile(String filePath) { try { Path f = new Path(filePath); FSDataOutputStream os = hdfs.create(f, true); os.close(); return true; } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return false; } /** * 复制文件到指定目录 * * @param srcfile * 复制的文件路径 * @param desfile * 粘贴的路径 * @return */ public static boolean hdfsCopyUtils(String srcfile, String desfile) { Configuration conf = new Configuration(); Path src = new Path(srcfile); Path dst = new Path(desfile); try { FileUtil.copy(src.getFileSystem(conf), src, dst.getFileSystem(conf), dst, false, conf); } catch (IOException e) { return false; } return true; } /** * 移动文件或者文件夹 * * @param src * 初始路径 * @param dst * 移动结束路径 * @throws Exception */ public static void movefile(String src, String dst) throws Exception { Path p1 = new Path(src); Path p2 = new Path(dst); hdfs.rename(p1, p2); } /** * 删除文件或者文件夹 * * @param src * @throws Exception */ public static void delete(String src) throws Exception { Path p1 = new Path(src); if (hdfs.isDirectory(p1)) { hdfs.delete(p1, true); System.out.println("删除文件夹成功: " + src); } else if (hdfs.isFile(p1)) { hdfs.delete(p1, false); System.out.println("删除文件成功: " + src); } } /** * 读取本地文件到HDFS系统, 保证文件格式是utf-8 * * @param localFilename * @param hdfsPath * @return */ public static boolean copyLocalFileToHDFS(String localFilename, String hdfsPath) { try { // 如果路径不存在就创建文件夹 mkdir(hdfsPath); File file = new File(localFilename); FileInputStream is = new FileInputStream(file); // 如果hdfs上已经存在文件,那么先删除该文件 if (checkFileExist(hdfsPath + "/" + file.getName())) { delete(hdfsPath + "/" + file.getName()); } Path f = new Path(hdfsPath + "/" + file.getName()); FSDataOutputStream os = hdfs.create(f, true); byte[] buffer = new byte[10240000]; int nCount = 0; while (true) { int bytesRead = is.read(buffer); if (bytesRead <= 0) { break; } os.write(buffer, 0, bytesRead); nCount++; if (nCount % (100) == 0) System.out.println((new Date()).toLocaleString() + ": Have move " + nCount + " blocks"); } is.close(); os.close(); System.out.println((new Date()).toLocaleString() + ": Write content of file " + file.getName() + " to hdfs file " + f.getName() + " success"); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 复制本地文件夹到hdfs的文件 * * @param localPath * @param hdfsPath * @return */ public static boolean CopyLocalDirTohdfs(String localPath, String hdfsPath) { try { File root = new File(localPath); File[] files = root.listFiles(); for (File file : files) { if (file.isFile()) { copyLocalFileToHDFS(file.getPath().toString(), hdfsPath); } else if (file.isDirectory()) { CopyLocalDirTohdfs(localPath + "/" + file.getName(), hdfsPath + "/" + file.getName()); } } return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 从hdfs下载 * * @param hdfsFilename * @param localPath * @return */ public static boolean downloadFileFromHdfs(String hdfsFilename, String localPath) { try { Path f = new Path(hdfsFilename); FSDataInputStream dis = hdfs.open(f); File file = new File(localPath + "/" + f.getName()); FileOutputStream os = new FileOutputStream(file); byte[] buffer = new byte[1024000]; int length = 0; while ((length = dis.read(buffer)) > 0) { os.write(buffer, 0, length); } os.close(); dis.close(); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * HDFS 到 HDFS 的合并 hdfs提供了一种FileUtil.copyMerge()的方法, 注意下面的 false * 这个,如果改为true,就会删除这个目录 * * @param folder * 需要合并的目录 * @param file * 要合并成的文件,完整路径名称 */ public static void copyMerge(String folder, String file) { Configuration conf = new Configuration(); Path src = new Path(folder); Path dst = new Path(file); try { FileUtil.copyMerge(src.getFileSystem(conf), src, dst.getFileSystem(conf), dst, false, conf, null); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 列出所有DataNode的名字信息 */ public void listDataNodeInfo() { try { DistributedFileSystem fs = null; fs = (DistributedFileSystem) hdfs; DatanodeInfo[] dataNodeStats = fs.getDataNodeStats(); String[] names = new String[dataNodeStats.length]; System.out.println("List of all the datanode in the HDFS cluster:"); for (int i = 0; i < names.length; i++) { names[i] = dataNodeStats[i].getHostName(); System.out.println(names[i]); } System.out.println(hdfs.getUri().toString()); } catch (Exception e) { e.printStackTrace(); } } /** * 检测是否是备用节点 * * @throws Exception */ public boolean checkStandbyException(String filename) { try { Path f = new Path(filename); hdfs.exists(f); } catch (org.apache.hadoop.ipc.RemoteException e) { if (e.getClassName().equals( "org.apache.hadoop.ipc.StandbyException")) { return true; } } catch (Exception e) { } return false; } /** * 合并文件 * * @param fileList * @param tarPath * @param rowTerminateFlag * @return */ public boolean mergeDirFiles(List<FileStatus> fileList, String tarPath, String rowTerminateFlag) { // rowTerminateFlag \n FSDataOutputStream tarFileOutputStream = null; FSDataInputStream srcFileInputStream = null; try { Path tarFile = new Path(tarPath); tarFileOutputStream = hdfs.create(tarFile, true); byte[] buffer = new byte[1024000]; int length = 0; long nTotalLength = 0; int nCount = 0; boolean bfirst = true; for (FileStatus file : fileList) { if (file.getPath().equals(tarFile)) { continue; } System.out.println(" merging file from " + file.getPath() + " to " + tarPath); if (!bfirst) { // 添加换行符 tarFileOutputStream.write(rowTerminateFlag.getBytes(), 0, rowTerminateFlag.length()); } srcFileInputStream = hdfs.open(file.getPath(), buffer.length); while ((length = srcFileInputStream.read(buffer)) > 0) { nCount++; tarFileOutputStream.write(buffer, 0, length); nTotalLength += length; // System.out.println(" file length " + file.getLen() + " // read " + length); if (nCount % 1000 == 0) { tarFileOutputStream.flush(); System.out.println((new Date()).toLocaleString() + ": Have move " + (nTotalLength / 1024000) + " MB"); } } srcFileInputStream.close(); bfirst = false; } } catch (Exception e) { e.printStackTrace(); try { delete(tarPath); } catch (Exception e2) { // TODO: handle exception } return false; } finally { try { if (tarFileOutputStream != null) { tarFileOutputStream.flush(); tarFileOutputStream.close(); srcFileInputStream.close(); } } catch (Exception e2) { // TODO: handle exception } } return true; } /** * 将一个字符串写入某个路径 * * @param text * 要保存的字符串 * @param path * 要保存的路径 */ public static void writerString(String text, String path) { try { Path f = new Path(path); FSDataOutputStream os = hdfs.create(f, true); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter( os, "utf-8"));// 以UTF-8格式写入文件,不乱码 writer.write(text); writer.close(); os.close(); } catch (Exception e) { e.printStackTrace(); } } /** * 按行读取文件内容,并且防止乱码 * * @param hdfsFilename * @return */ public static boolean readByLine(String hdfsFilename) { try { Path f = new Path(hdfsFilename); FSDataInputStream dis = hdfs.open(f); BufferedReader bf = new BufferedReader(new InputStreamReader(dis));// 防止中文乱码 String line = null; while ((line = bf.readLine()) != null) { System.out.println(new String(line.getBytes(), "utf-8")); } dis.close(); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * * @param srcPath * @param tarPath * @throws Exception */ public static void reNameExistsPath(String srcPath, String tarPath) throws Exception { // 检测输出目录是否存在,存在就改名 if (checkFileExist(srcPath)) { tarPath = srcPath.trim(); while (tarPath.charAt(tarPath.length() - 1) == '/') { tarPath = tarPath.substring(0, tarPath.length() - 1); } Date now = new Date(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyMMddHHmmss"); String nowStr = dateFormat.format(now); tarPath += "_" + nowStr; movefile(srcPath, tarPath); } else { tarPath = srcPath; } } } |
执行后会报一个类转换错误 crying~~
18/06/01 16:44:55 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 192.168.17.248, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
...
开始我是没有用hdfs工具的,直接system.out输出到控制台上,但是只有spark为local模式下才能不报错正确输出结果,但是spark模式写出tarzan:7077就会报上边的错误,经过无限长的找错,发现eclipse只能提供给分布式这种的local调试功能,但不能提供分布式调试,于是我加了hdfs工具类把结果追加到hdfs中,并把程序打了jar,丢到spark/bin中执行
./spark-submit --master spark://tarzan:7077 --class com.lxw.test.WordCount2 /usr/local/WordCount2.jar |