现在已有的很多博客demo都是以wordcount为例,众所周知这是一个非常简单的功能,但凡遇到一些高阶一点的操作我都会大脑一片空白,今天正好有相关的需求,就来学习了一下。
http://www.zhangdongshengtech.com/article-detials/236
上面的链接是记录频次的demo,写的非常的好,相信各位看了它就会了解mapreduce核心的写法
Intro:wordcount
说在前面:mapreduce程序的调试可以单独分别运行mapper和reducer,直接在命令行输入你指定好的输入格式,就会打印出输出
mapper.py
输入文件的形式就是
word1
word2
word1
word3
# coding=utf-8
import sys
for line in sys.stdin:
words = line.strip().split('|')
try:
his = data['uP_cat']
for vid, fre in his.items():
if vid[0] != 'V': continue
print(vid)
except:
continue
reducer.py
这里实现的就是一个简单的计数并把频次写到文件中的操作。
如果你只需要实现计数操作,那么只用修改mapper.py的print的值即可
# coding=utf-8
import sys
count = 0
key = ""
current_key = ""
for line in sys.stdin:
line = line.rstrip()
if not line:
sys.stderr.write("data is wrong")
sys.exit(1)
line = line.rstrip()
items = line.split("\t")
current_key = items[3]
cur_timestamp = items[2]
if current_key == key:
if cur_timestamp < timestamp:
print "%s\t%d" % (key, count)
count = 0
key = current_key
count += 1
if key:
print "%s\t%d" % (key, count)
run.sh
运行环境配置这一块我可能没有办法讲清楚,因为是别人写好的脚本,我只修改了上面2个代码
在这里修改你的输入路径和输出路径
#!/bin/bash
HADOOP_bin='/your/path/hadoop-2.7.3/bin/hadoop'
INPUT_PATH="input_data"
OUTPUT_PATH="test"
$HADOOP_bin fs -rmr $OUTPUT_PATH
$HADOOP_bin jar /your/path/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar\
-D mapred.job.priority="VERY_HIGH"\
-D mapred.reduce.tasks=200\
-D mapreduce.job.queuename=root.online.default\
-D mapred.job.map.capacity=400\
-D mapred.job.reduce.capacity=100\
-D mapred.job.name="test"\
-D mapred.textoutputformat.ignoreseparator="true"\
-input ${INPUT_PATH} \
-output ${OUTPUT_PATH} \
-file ./mapper.py\
-file ./reducer.py\
-partitioner "org.apache.hadoop.mapred.lib.HashPartitioner"\
-mapper "python mapper.py"\
-reducer "python reducer.py"\
-inputformat "org.apache.hadoop.mapred.TextInputFormat"\
-outputformat "org.apache.hadoop.mapred.TextOutputFormat"\
Advance:有条件的合并内容
下面来实现把输入按某一个值进行合并
输入形式:
key1 value1 value2
key2 value1 value2
key1 value3 value4
输出形式:
key1 value1 value2 value3 value4…………
key2 value1 value2 …………
mapper.py
#coding=utf8
import json
import sys
#f = open('part-07198', 'r') #调试用,因为我的mapreduce任务配置在python2下,调试的时候sys.stdin接收不到输入,所以直接读文件
for line in sys.stdin:
line = line.strip()
if not line:
continue
data = line.split('\t', 2) #只区分key,后面的values不做区分
if len(data) <= 1:
continue
print data
这里如果怕数据有问题可以写在try except里,如果还需要对每一行的数据做什么处理都放在mapper里处理,当把数据预处理成可以根据某一项进行合并时就print输出,丢给reducer
reducer.py
import json
import sys
from operator import itemgetter
from itertools import groupby
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 2)
def main(separator='\t'):
#f = open('part-07198', 'r') #调试用
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
for name, group in groupby(data, itemgetter(0)):
val = []
for values in group:
for v in values[1]:
val.append(v)
print "%s\t%s"% (values[0], json.dumps(v))
if __name__ == "__main__":
main()
这里有两个函数非常重要,搞懂了它们你就能搞懂如何写reducer,再复杂的功能你都能变着花实现
groupby
https://blog.csdn.net/Together_CZ/article/details/73042997
key, group= groupby(iterator, key=func())
将key函数作用于原循环器的各个元素。根据key函数结果,将拥有相同函数结果的元素分到一个新的循环器。每个新的循环器以函数返回结果为标签。
这就好像一群人的身高作为循环器。我们可以使 用这样一个key函数: 如果身高大于180,返回"tall";如果身高底于160,返回"short";中间的返回"middle"。最终,所有身高将分为三个循环器, 即"tall", “short”, “middle”。
这个函数的意思就是说把原来的迭代器中的值按照某一个key聚合
再来复习一下mapreduce的原理:
hadoop框架会自动的将相同的key分配到同一个reducer上,这个key,默认的就是上一个mapper输出数据的以\t,或者\001分割后的第一部分
看到这里大概应该就明白了,只要我们把所需要合并的key在mapper中变换到第一位输出,这样就能用groupby直接进行聚合
那么,groupby的输出是什么呢?
groupby的输出有两部分,一部分是key,另一部分就是同一key下的所有data,这里的data同样含有key这个字段
看一下https://blog.csdn.net/LY_ysys629/article/details/72553273这个实例应该就能对groupby的输出有直观感受了。
itemgetter
我觉得这篇博客的理解写的非常好https://blog.csdn.net/qq_22022063/article/details/79019294
作用:itemgetter 用于获取对象的哪些位置的数据,参数即为代表位置的序号值,
也就是说itemgetter的参数等于i,就取data[i]的数据,并且它是一个函数,因此可以直接用作groupby的key传参。
中文字符的处理
我已经被这个坑了两次了,记录一下
(1)文件开头一定要记得#coding=utf8
(2)如果保存的文件是’\u’开头的,解析的时候用json.loads能直接解析出中文
(3)如果保存的文件是’\x’开头的。。。可能在解析的时候要decode(‘utf-8’)吧。。。