由于 Spark 中的 save 方法必须确保原始目录 不存在,而对于我编写的Job,不能确保指定存储的目录不存在,故需要将原来的目录删除。
为此我封装了一套对HDFS 操作的 Java API
项目的pom 文件:
需要的组件
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.8.1</version> </dependency>
代码如下:
package com.yaobaling.reportmail.hdfs.util; import com.yaobaling.reportmail.config.BasicConfig; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import java.io.File; import java.io.FileOutputStream; import java.net.URI; import java.util.Properties; /** * Created by szh on 2018/4/11. * * @author szh * @date 2018/4/11 */ public class HdfsUtil { private FileSystem fileSystem; private String hdfsURL; public HdfsUtil(Properties hdfsConfig) throws Exception { try { hdfsURL = hdfsConfig.getProperty("hdfs.url"); if (StringUtils.isEmpty(hdfsURL)) { throw new Exception("hdfs.url is not set, please assign hdfs.url"); } URI uri = new URI(hdfsURL); Configuration conf = new Configuration(); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); fileSystem = FileSystem.get(uri, conf); } catch (Exception e) { e.printStackTrace(); throw new Exception("error in init hdfs fileSystem, please check your config"); } } /** * 查看目录 * * @param filePath * @return * @throws Exception */ public String dir(String filePath) throws Exception { Path path = new Path(filePath); FileStatus fileStatus = fileSystem.getFileStatus(path); StringBuffer menuBuffer = new StringBuffer(); for (FileStatus fs : fileSystem.listStatus(path)) { menuBuffer.append(fs.getPath()).append("\n"); } return menuBuffer.toString(); } /** * 浏览文件 * * @param filePath * @throws Exception */ public void viewFile(String filePath) throws Exception { Path path = new Path(filePath); FSDataInputStream fsDataInputStream = fileSystem.open(path); int c; while ((c = fsDataInputStream.read()) != -1) { System.out.print((char) c); } fsDataInputStream.close(); } /** * 上传文件 * * @param sourceFilePath * @param destFilePath * @throws Exception */ public void upload(String sourceFilePath, String destFilePath) throws Exception { Path srcPath = new Path(sourceFilePath); Path dstPath = new Path(destFilePath); fileSystem.copyFromLocalFile(false, srcPath, dstPath); } /** * 下载文件 * * @param sourceFilePath * @param destFilePath * @throws Exception */ public void download(String sourceFilePath, String destFilePath) throws Exception { if (fileSystem.isFile(new Path(sourceFilePath))) { downloadFile(sourceFilePath, destFilePath); } else { downloadFolder(sourceFilePath, destFilePath); } } /** * 下载文件 * * @param sourceFilePath * @param destFilePath * @throws Exception */ public void downloadFile(String sourceFilePath, String destFilePath) throws Exception { FSDataInputStream in = null; FileOutputStream out = null; try { in = fileSystem.open(new Path(sourceFilePath)); out = new FileOutputStream(destFilePath); IOUtils.copyBytes(in, out, 4096, false); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } /** * 下载文件 * * @param sourceFilePath * @param destFilePath * @throws Exception */ public void downloadFolder(String sourceFilePath, String destFilePath) throws Exception { File dstDir = new File(destFilePath); if (!dstDir.exists()) { dstDir.mkdirs(); } FileStatus[] srcFileStatus = fileSystem.listStatus(new Path(sourceFilePath)); Path[] srcFilePath = FileUtil.stat2Paths(srcFileStatus); for (int i = 0; i < srcFilePath.length; i++) { String srcFile = srcFilePath[i].toString(); int fileNamePosi = srcFile.lastIndexOf('/'); String fileName = srcFile.substring(fileNamePosi + 1); download(sourceFilePath + '/' + fileName, destFilePath + '/' + fileName); } } /** * 删除文件 * * @param filePath * @throws Exception */ public void delete(String filePath) throws Exception { Path path = new Path(filePath); fileSystem.delete(path, true); } public static void main(String[] args) throws Exception { BasicConfig basicConfig = BasicConfig.getInstance(); HdfsUtil hdfsUtil = new HdfsUtil(basicConfig.getHdfsConfig()); System.out.println(hdfsUtil.dir("/")); // hdfsUtil.upload("E:\\\\report.tgz","/"); // hdfsUtil.delete("/report.tgz"); hdfsUtil.download("/bigdata/aiqianjin/csv/2018-04-11/", "E:\\\\download\\"); } }