版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012292754/article/details/83514517
1 MapReduce 优点
- 海量数据离线处理;
- 易开发,易运行;
2 MapReduce 编程模型
- 将作业拆分成 Map 阶段和 Reduce 阶段
- Map阶段: Map Tasks
- Reduce阶段: Reduce Tasks
2.1 wordcount 案例
2.2 核心概念
- Split: 交由 MapReduce 作业来处理的数据块,是MapReduce 中最小的计算单元;HDFS: blocksize 是HDFS 中的最小存储单元,128M;默认情况下:Split 和 block 是一一对应的,也可以手动设置他们的关系(不建议)
- InputFormat
- OutputFormat
- Combiner
- Partitioner
3 MapReduce 架构
3.1 版本1.x
- JobTracker : JT, 作业管理者,将作业分解成一堆的任务:Task(MapTask , ReduceTask);将任务分配给 TaskTracker 运行;作业的监控、容错处理(task作业挂了,重启 task的机制); 在一定的时间间隔内,JT没有收到 TT 的心跳,TT可能挂了,这个TT上指派的任务可能被指派到其他TT上运行;
- TaskTracker: TT, 任务的执行者,在TT 上执行 Task(MapTask , ReduceTask); 会与JT进行交互:执行、启动、停止作业;发送心跳信息给 JT;
- MapTask: 自己开发的 map 任务交由该 Task 处理;将 map 的输出结果写到本地磁盘;
- ReduceTask : 对 Map Task 输出的数据进行读取;按照数据进行分组传给我们自己编写的 reduce 方法处理;
3.2 MapReduce2.x
3 wordcount 案例
3.1 源码
package com.bzt.cn.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.FileOutputStream;
import java.io.IOException;
/*
* MapReduce 版 wordcount
* */
public class WordCountApp {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), one);
}
}
}
/*
* Reducer ; 归并操作
* */
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
long sum = 0;
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 创建 job
Job job = Job.getInstance(conf, "WC");
// 设置 job 处理类
job.setJarByClass(WordCountApp.class);
//设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置 map 相关的参数
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.2 maven 打包成jar,上传到集群
运行 [hadoop@node1 ~]$ hadoop jar wordcount.jar com.bzt.cn.mapreduce.WordCountApp hdfs://node1:8020/hello.txt hdfs://node1:8020/wcout
[hadoop@node1 ~]$ hadoop fs -ls /wcout
18/10/30 09:38:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2018-10-30 09:38 /wcout/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 30 2018-10-30 09:38 /wcout/part-r-00000
[hadoop@node1 ~]$ hadoop fs -text /wcout/part-r-00000
18/10/30 09:39:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello 4
jerry 5
tom 7
world 8
[hadoop@node1 ~]$
3.4 增强版
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 清理已经存在的输出目录
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath);
System.out.println("output file deleted!");
}
// 创建 job
Job job = Job.getInstance(conf, "WC");
// 设置 job 处理类
job.setJarByClass(WordCountApp.class);
//设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置 map 相关的参数
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4 Combiner
- 本地的 reducer
- 减少Map Tasks 输出的数据量及数据网络传输量
- 适用场景:求和,次数
5 Partitioner
- Partitioner 决定 MapTask 输出的数据交由哪个ReduceTask处理
- 默认实现: 分发的 key 的 hash 值对 Reduce Task 个数取模
5.1 测试数据
5.2 源码
package com.bzt.cn.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartitionerApp {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
context.write(new Text(words[0]),new LongWritable(Long.parseLong(words[1])));
}
}
/*
* Reducer ; 归并操作
* */
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
long sum = 0;
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
public static class MyPartitioner extends Partitioner<Text,LongWritable>{
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
if(key.toString().equals("dog")){
return 0;
}
if(key.toString().equals("cat")){
return 1;
}
if(key.toString().equals("duck")){
return 2;
}
return 3;
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 清理已经存在的输出目录
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath);
System.out.println("output file deleted!");
}
// 创建 job
Job job = Job.getInstance(conf, "WC");
// 设置 job 处理类
job.setJarByClass(PartitionerApp.class);
//设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置 map 相关的参数
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置 job 的partition
job.setPartitionerClass(MyPartitioner.class);
//设置 4 个 reducer,每个分区一个
job.setNumReduceTasks(4);
//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5.3 maven 打包,在集群运行
[hadoop@node1 ~]$ hadoop jar part.jar com.bzt.cn.mapreduce.PartitionerApp hdfs://node1:8020/animal.txt hdfs://node1:8020/partionerout
[hadoop@node1 ~]$ hadoop fs -ls /partionerout
18/10/30 10:39:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r-- 1 hadoop supergroup 0 2018-10-30 10:38 /partionerout/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 6 2018-10-30 10:38 /partionerout/part-r-00000
-rw-r--r-- 1 hadoop supergroup 6 2018-10-30 10:38 /partionerout/part-r-00001
-rw-r--r-- 1 hadoop supergroup 7 2018-10-30 10:38 /partionerout/part-r-00002
-rw-r--r-- 1 hadoop supergroup 8 2018-10-30 10:38 /partionerout/part-r-00003
[hadoop@node1 ~]$ hadoop fs -cat /partionerout/part-r-00000
18/10/30 10:40:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
dog 7
[hadoop@node1 ~]$ hadoop fs -cat /partionerout/part-r-00001
18/10/30 10:40:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
cat 6
[hadoop@node1 ~]$ hadoop fs -cat /partionerout/part-r-00002
18/10/30 10:40:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
duck 7
[hadoop@node1 ~]$ hadoop fs -cat /partionerout/part-r-00003
18/10/30 10:40:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
lion 13
[hadoop@node1 ~]$
6 jobhistory
- 记录已运行完的 MapReduce 信息到指定的HDFS目录
- 默认关闭
6.1 配置 jobhistory
/home/hadoop/apps/hadoop-2.6.0-cdh5.7.0/etc/hadoop
mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>node1:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>node1:19888</value>
</property>
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/history/done</value>
</property>
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/history/done_intermediate</value>
</property>
</configuration>
6.2 启动 history server
重启一下 yarn
[hadoop@node1 ~]$ mr-jobhistory-daemon.sh start historyserver
[hadoop@node1 ~]$ mr-jobhistory-daemon.sh start historyserver
starting historyserver, logging to /home/hadoop/apps/hadoop-2.6.0-cdh5.7.0/logs/mapred-hadoop-historyserver-node1.out
[hadoop@node1 ~]$ jps
6704 JobHistoryServer
6738 Jps
1395 DataNode
6245 ResourceManager
1271 NameNode
1559 SecondaryNameNode
6346 NodeManager
[hadoop@node1 ~]$
6.3 测试
[hadoop@node1 ~]$ cd /home/hadoop/apps/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce2
[hadoop@node1 mapreduce2]$ clear
[hadoop@node1 mapreduce2]$ hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3
访问 http://node1:19888/jobhistory
点进去看 log
聚合没有打开
配置 yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
</configuration>
重启 yarn,再跑一次 pi