python操作hdfs基础及简单案例 python3 pyhdfs hadoop2.6.4
在mac_pro上运行,自己搭建的hadoop集群,测试之前先要将集群/etc/hosts文件中域名映射ip关系,追加到mac /etc/hosts中
在mac_pro终端
vi /etc/hosts
# 把以下内容追加到/etc/hosts
192.168.146.135 slave2
192.168.146.133 master
192.168.146.134 slave1
一 需求描述
公司构建大数据平台,数据由供应商(A)产生,A会每天把生成的前一天数据通过sftp推到公司指定的服务器。(供应商推数据到服务器的频率可能发生改变,比如,每半天把前12小时的数据推过来,这样一天就会推过来两次,或者每三分钟推过来前三分钟的数据)。datat团队需要把推过来的数据放到hadoop集群指定目录(每天的目录不一样)
需求细分(以一天推一次为例),需求总体为:将一个csv文件推到hadoop集群指定位置
- 检查要推到hadoop集群的目录是否存在,存在则删除目录下所有的文件,不存在则创建目录
- 把文件从本地推到hadoop集群(分为一天推一次,一天推两次及一天推多次)
二 pyhdfs介绍
2.1.连接hdfs,查看集群活跃的namenode
class pyhdfs.HdfsClient(hosts=u’localhost’, randomize_hosts=True, user_name=None, timeout=20, max_tries=2, retry_delay=5, requests_session=None, requests_kwargs=None)
Parameters | Describe |
---|---|
hosts (list or str) | List of NameNode HTTP host:port strings, either as list or a comma separated string. Port defaults to 50070 if left unspecified. |
randomize_hosts (bool) | By default randomize host selection. |
user_name | What Hadoop user to run as. Defaults to the HADOOP_USER_NAME environment variable if present, otherwise getpass.getuser(). |
timeout (float) | How long to wait on a single NameNode in seconds before moving on. In some cases the standby NameNode can be unresponsive (e.g. loading fsimage or checkpointing), so we don’t want to block on it. |
max_tries (int) | How many times to retry a request for each NameNode. If NN1 is standby and NN2 is active, we might first contact NN1 and then observe a failover to NN1 when we contact NN2. In this situation we want to retry against NN1. |
retry_delay (float) | How long to wait in seconds before going through NameNodes again |
requests_session | A requests.Session object for advanced usage. If absent, this class will use the default requests behavior of making a new session per HTTP request. Caller is responsible for closing session. |
requests_kwargs | Additional **kwargs to pass to request |
参数解析
- hosts:主机名 IP地址与port号之间需要用","隔开 如:hosts=“45.91.43.237,9000” 多个主机时可以传入list, 如:[“47.95.45.254,9000”,“47.95.45.235,9000”]
- randomize_hosts:随机选择host进行连接,默认为True
- user_name:连接的Hadoop平台的用户名
- timeout:每个Namenode节点连接等待的秒数,默认20sec
- max_tries:每个Namenode节点尝试连接的次数,默认2次
- retry_delay:在尝试连接一个Namenode节点失败后,尝试连接下一个Namenode的时间间隔,默认5sec
- requests_session:连接HDFS的HTTP request请求使用的session,默认为None
2.1.1.连接集群hdfs
pip install pyhdfs
from pyhdfs import HdfsClient
fs = HdfsClient(hosts='192.168.146.133, 50070', user_name='root')
2.1.2.查看集合活跃的namenode节点
print(fs.get_active_namenode())
输出结果
192.168.146.133:50070
2.1.3.查看集群根目录下有哪些文件
print(fs.listdir("/"))
输出结果
['data', 'tmp']
2.2 与hadoop集群交互hdfs文件夹及文件等信息
2.2.1.查看文件夹或文件是否存在
print(fs.exists("/data/data_coe/data_asset/product/db/tmp"))
输出结果:
True
2.2.2.创建文件夹,递归创建
print(fs.mkdirs("/data/data_coe/data_asset/product/db/tmp"))
输出结果:
True
2.2.3.上传本地文件到hdfs集群,从集群上下载文件
# overwrite=True,如果集群存在该文件,则直接覆盖,上传本地文件到集群和从集群下载文件到本地同样适用
fs.copy_from_local("python_hdfs.py", "/data/data_coe/data_asset/product/db/tmp/python_hdfs.py", overwrite=True)
fs.copy_to_local("/data/data_coe/data_asset/product/db/tmp/python_hdfs.py", "mcdsale.csv")
2.2.4.打开一个远程节点上的文件,返回一个HTTPResponse对象
response = fs.open("/data/data_coe/data_asset/product/db/tmp/python_hdfs.py")
print(response.data.decode())
2.2.5.append 追加文件到hdfs集群上已有文件的末尾
f = open("python_hdfs.py", "r")
# 追加字符串到文件末尾
fs.append("/data/data_coe/data_asset/product/db/tmp/mcdsale.csv", "\nhello world")
# 追加文件到文件末尾
fs.append("/data/data_coe/data_asset/product/db/tmp/mcdsale.csv", f.read().encode())
response = fs.open("/data/data_coe/data_asset/product/db/tmp/mcdsale.csv")
print(response.data.decode())
三 项目代码(比较简洁,后续会把整个project代码贴上)
# -*- coding: utf-8 -*-
# @Time : 2018/10/20 下午1:49
# @Author : Einstein Yang!!
# @Nickname : 穿着开裆裤上大学
# @FileName: mcdsale.csv
# @Software: PyCharm
# @PythonVersion: python3.5
# @Blog :https://blog.csdn.net/weixin_41734687
import pyhdfs
def put_file_to_hdfs(csv_path):
# 把文件推到指定的hdfs路径,如果路径存在删除该路径下所有文件,不存在则创建新目录
fs = pyhdfs.HdfsClient(hosts="192.168.146.133, 50070", user_name="root")
if fs.exists("/data/data_coe/data_asset/product/db/tmp/"):
fs.delete("/data/data_coe/data_asset/product/db/tmp/", recursive=True)
fs.mkdirs("/data/data_coe/data_asset/product/db/tmp/")
fs.copy_from_local("{}".format(csv_path), "/data/data_coe/data_asset/product/db/tmp/mcdsale.csv", overwrite=True)
if __name__ == '__main__':
put_file_to_hdfs("mcdsale.csv")