(1)编写 mapper 程序 hdfs_map.py, 代码如下:
import sys
def read_input(file):
for line in file:
yield line.split()
def main():
data = read_input(sys.stdin)
for words in data:
for word in words:
print("%s%s%d" % (word, '\t', 1))
if __name__ == '__main__':
main()
(2)编写 reducer 程序 hdfs_reduce.py, 代码如下:
import sys
from operator import itemgetter
from itertools import groupby
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main():
data = read_mapper_output(sys.stdin)
for current_word, group in groupby(data, itemgetter(0)):
total_count = sum(int(count) for current_word, count in group)
print("%s%s%d" % (current_word, '\t', total_count))
if __name__ == "__main__":
main()
(3)创建一个输入文件 mk.txt , mk.txt 内容如下:
chen guo wei chen guo
go go python java c++
c++ java pthread pthread
process process
(4)使用 hdfs 把 mk.txt 上传的 hdfs 中。
hdfs dfs -mkdir /test
hdfs dfs -copyFromLocal mk.txt /test
(5)使用如下命令来使用 hadoop 执行 mapper、reducer 程序。
/Users/chenguowei/softwares/hadoop-2.8.1/bin/hadoop jar
/Users/chenguowei/softwares/hadoop-2.8.1/share/hadoop/tools/lib/hadoop-streaming-2.8.1.jar
-files "hdfs_map.py,hdfs_reduce.py"
-input /test/mk.txt -output /tmp/wordcounttest
-mapper "python3 hdfs_map.py" -reducer "python3 hdfs_reduce.py”
(6)查看终端输出
(7)使用 hdfs 来查看结果
因为在第(6)步是指定了 -output //tmp/wordcounttest ,所以我们使用 hdfs dfs -ls /tmp/wordcounttest ,可以得到两个文件
-rw-r--r-- 1 chenguowei supergroup 0 2018-07-22 11:42 /tmp/wordcounttest/_SUCCESS
-rw-r--r-- 1 chenguowei supergroup 66 2018-07-22 11:42 /tmp/wordcounttest/part-00000
其中 _SUCCESS 保存的是日志,而 part_00000才是结果。
可以使用 hdfs dfs -cat /temp/wordcounttest/part-00000 来查看结果