################################################################# ################################################################# ################################################################# #### 615 明辉科技### 针对所有子项目开发的公共 utils,不具有业务熟悉 #### 作者: 曹明杰 #---------------------针对hdfs------------------------# # !coding:utf-8 import os from pyhdfs import HdfsClient, HdfsFileNotFoundException from brushtickes.com.mh.brush.brushutils \ import brush_util as bu # 关于python操作hdfs的API可以查看官网: # https://hdfscli.readthedocs.io/en/latest/api.html # 读取hdfs文件内容,将数据放入到本地的目录的文件中 #并将读取的数据返回 def read_hdfs_file( filename,local_path='',**kwargs): #HTTPResponse """Return a file-like object for reading the given HDFS path. :param offset: The starting byte position. :type offset: long :param length: The number of bytes to be processed. :type length: long :param buffersize: The size of the buffer used in transferring data. :type buffersize: int :rtype: file-like object """ client__open = get_client().open(filename,**kwargs) print(client__open.data) if local_path != '': try: open(local_path,'a+').write(client__open.data) except TypeError: open(local_path, 'ab+').write(client__open.data) return client__open.data # 创建目录 def mkdirs( hdfs_path): get_client().mkdirs(hdfs_path) # 删除hdfs文件 def delete_hdfs_file(hdfs_path): get_client().delete(hdfs_path) # 上传文件到hdfs def put_to_hdfs_no_flag( local_path, hdfs_path): upload = upload = get_client().copy_from_local(hdfs_path,local_path) print_base_log(upload, '上传文件到hdfs,并把本地上传的文件标志位 success 已经完成', 'put_to_hdfs') # 上传文件到hdfs,并把本地上传的文件标志位 success 已经完成 def put_to_hdfs_flag( local_path, hdfs_path): upload = get_client().copy_from_local(local_path,hdfs_path) os.rename(local_path,"success_"+local_path+str(bu.get_new_time())) print_base_log(upload,'上传文件到hdfs,并把本地上传的文件标志位 success 已经完成','put_to_hdfs_flag') # 从hdfs获取文件到本地 def get_from_hdfs( hdfs_path, local_path): get_client().copy_to_local(hdfs_path,local_path) # 追加数据到hdfs文件 def append_to_hdfs( hdfs_path, data): get_client().append(hdfs_path,data) # 覆盖数据写到hdfs文件 def write_to_hdfs(hdfs_path, data): exists = get_client().exists(hdfs_path) if exists: get_client().delete(hdfs_path) get_client().append(hdfs_path,data) # 移动或者修改文件 def move_or_rename(hdfs_src_path, hdfs_dst_path): get_client().rename(hdfs_src_path,hdfs_dst_path) # 返回目录下的文件信息 def list( hdfs_path): try: resp = get_client().listdir(hdfs_path) print_base_log(str(resp),hdfs_path+'下的目录文件信息','list') except NotADirectoryError: print_base_log("该"+hdfs_path+"不是文件夹", hdfs_path + '下的目录文件信息', 'list') except HdfsFileNotFoundException: print_base_log("该" + hdfs_path + "不存在", hdfs_path + '下的目录文件信息', 'list') return resp #这里访问的是50070 端口 def get_client(hdfs_url='hdfs://node1:50070'): return HdfsClient(hosts='node1:50070', user_name='root') def print_base_log(obj,item='hdfs',option=''): bu.print_custom_masg(obj, item,option, 'base_utils.py') def put_to_hdfs_flag2(local_path,hdfs_path): client = HdfsClient(hosts='node1:50070', user_name='root') client.copy_from_local(local_path,hdfs_path) # 本地文件绝对路径,HDFS目录必须不存在
亲测可行。就发出来给大家参考