spark上传文件和追加文件到hdfs

一、代码实现

package com.xtd.hdfs

import java.io.File

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}

import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object HDFSUtils {

  def main(args: Array[String]): Unit = {
    val status = uploadFile("C:\\Users\\com\\Desktop\\测试数据\\","/test/file/","报销单.docx")
    if(status) println("上传成功!") else println("上传失败")
  }

  /**
   * 本地文件上传到 hdfs
   * @param localDirectory 本地目录
   * @param hdfsDirectory  hdfs目录
   * @param fileName       文件名称
   * @return true:上传成功  flase:上传失败
   */
  def uploadFile(localDirectory:String,hdfsDirectory:String,fileName:String): Boolean = {

    val configuration:Configuration = new Configuration()
    val fileSystem:FileSystem = FileSystem.get(configuration)

    val localFullPath = localDirectory+"/"+fileName
    val hdfsFullPath = hdfsDirectory+"/"+fileName

    val localPath = new Path(localFullPath)
    val hdfspath = new Path(hdfsDirectory)
    val hdfsfilepath = new Path(hdfsFullPath)

    val status1 = new File(localFullPath).isFile
    val status2 = fileSystem.isDirectory(hdfspath)
    val status3 = fileSystem.exists(hdfsfilepath)
    println(status1,status2,!status3)

    // 本地文件存在,hdfs目录存在,hdfs文件不存在(防止文件覆盖)
    if(status1 && status2 && !status3) {
      fileSystem.copyFromLocalFile(false,false,localPath,hdfsfilepath)
      return true
    }
    false
  }

}

二、代码说明

这里做了三个判断

status1:本地文件存在
status2:hdfs目录存在
status3:hdfs文件不存在

查看源码,删除代码就一个 copyFromLocalFile方法,为啥写这么复杂呢??

    public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException {
        Configuration conf = this.getConf();
        FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
    }

原因一:FileSystem类 的 copyFromLocalFile方法没有返回值,但是业务需要知道上传状态

原因二:copyFromLocalFile方法的 hdfs Path 可以写上传的目录也可以写成 目录+文件名

但是,如果本来输入的 hdfs是目录,但是由于这个路径不存在,copyFromLocalFile方法会把

最后一个目录的当成文件的名称当成文件名上传至hdfs,文件名后缀没了,而且容易造成混乱

三、运行效果

四、写入文件 

hadoop不推荐追加文件到hdfs,如果需要追加文件有两个思路

1、先把内容追加到本地文件,再从本地上传到 hdfs(大数据场景下推荐使用)

2、用集合或者String数组先把追加的缓存,最后再一次性追加到hdfs (小数据或系统内存大的场景下)

hadoop 默认关闭hdfs文件追加功能,开启需要配置 hdfs-site.xml 文件

<property>
	<name>dfs.support.append</name>
	<value>true</value>
</property>

实现代码 

val configuration:Configuration = new Configuration()
val fileSystem:FileSystem = FileSystem.get(configuration)
val path:Path = new Path("xxx")
fileSystem.append(path)

猜你喜欢

转载自blog.csdn.net/qq262593421/article/details/107042815