python 之pydhfs 对hdfs 进行操作

#################################################################
#################################################################
#################################################################
#### 615 明辉科技### 针对所有子项目开发的公共 utils,不具有业务熟悉 
#### 作者: 曹明杰
#---------------------针对hdfs------------------------#


# !coding:utf-8
import  os
from pyhdfs import HdfsClient, HdfsFileNotFoundException
from brushtickes.com.mh.brush.brushutils \
    import brush_util as bu


# 关于python操作hdfs的API可以查看官网:
# https://hdfscli.readthedocs.io/en/latest/api.html


# 读取hdfs文件内容,将数据放入到本地的目录的文件中
#并将读取的数据返回
def read_hdfs_file( filename,local_path='',**kwargs):
    #HTTPResponse
    """Return a file-like object for reading the given HDFS path.
    :param offset: The starting byte position.
    :type offset: long
    :param length: The number of bytes to be processed.
    :type length: long
    :param buffersize: The size of the buffer used in transferring data.
    :type buffersize: int
    :rtype: file-like object
    """
    client__open = get_client().open(filename,**kwargs)
    print(client__open.data)
    if local_path != '':
        try:
         open(local_path,'a+').write(client__open.data)
        except TypeError:
            open(local_path, 'ab+').write(client__open.data)
    return client__open.data

# 创建目录
def mkdirs( hdfs_path):
    get_client().mkdirs(hdfs_path)


# 删除hdfs文件
def delete_hdfs_file(hdfs_path):
    get_client().delete(hdfs_path)


# 上传文件到hdfs
def put_to_hdfs_no_flag( local_path, hdfs_path):
    upload = upload = get_client().copy_from_local(hdfs_path,local_path)
    print_base_log(upload, '上传文件到hdfs,并把本地上传的文件标志位 success 已经完成', 'put_to_hdfs')


# 上传文件到hdfs,并把本地上传的文件标志位 success 已经完成
def put_to_hdfs_flag( local_path, hdfs_path):
    upload = get_client().copy_from_local(local_path,hdfs_path)
    os.rename(local_path,"success_"+local_path+str(bu.get_new_time()))
    print_base_log(upload,'上传文件到hdfs,并把本地上传的文件标志位 success 已经完成','put_to_hdfs_flag')

# 从hdfs获取文件到本地
def get_from_hdfs( hdfs_path, local_path):
    get_client().copy_to_local(hdfs_path,local_path)


# 追加数据到hdfs文件
def append_to_hdfs( hdfs_path, data):
    get_client().append(hdfs_path,data)


# 覆盖数据写到hdfs文件
def write_to_hdfs(hdfs_path, data):
    exists = get_client().exists(hdfs_path)
    if exists:
        get_client().delete(hdfs_path)
    get_client().append(hdfs_path,data)



# 移动或者修改文件
def move_or_rename(hdfs_src_path, hdfs_dst_path):
    get_client().rename(hdfs_src_path,hdfs_dst_path)


# 返回目录下的文件信息
def list( hdfs_path):
   try:
       resp =  get_client().listdir(hdfs_path)
       print_base_log(str(resp),hdfs_path+'下的目录文件信息','list')
   except NotADirectoryError:
       print_base_log("该"+hdfs_path+"不是文件夹", hdfs_path + '下的目录文件信息', 'list')
   except HdfsFileNotFoundException:
       print_base_log("该" + hdfs_path + "不存在", hdfs_path + '下的目录文件信息', 'list')

   return resp

#这里访问的是50070 端口
def get_client(hdfs_url='hdfs://node1:50070'):
    return  HdfsClient(hosts='node1:50070', user_name='root')

def print_base_log(obj,item='hdfs',option=''):
    bu.print_custom_masg(obj, item,option, 'base_utils.py')

def put_to_hdfs_flag2(local_path,hdfs_path):
    client = HdfsClient(hosts='node1:50070', user_name='root')
    client.copy_from_local(local_path,hdfs_path)  # 本地文件绝对路径,HDFS目录必须不存在

亲测可行。就发出来给大家参考

猜你喜欢

转载自blog.csdn.net/qq_29499107/article/details/85337052