分布式处理框架MapReduce
目录
本文是慕课网大数据的学习笔记与总结:
1.MapReduce概述
1.优点:海量数量离线处理&易开发&易运行
2.缺点:没办法满足实时流式计算
2.MapReduce编程模型
wordCount:统计文件中,每个单词出现的次数
需求:求wordCount
1.文件内容小:可以用shell解决
2.文件内容很大:达到TB,GB级别==>利用分布式框架解决:如mapReduce
核心概念
Split:交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元
HDFS:blockSize 是HDFS中最小的存储单元默认为128M
默认情况下:split和HDFS的Block他们两个是一一对应的关系,当然也可以手工设置(不建议)
InputFormat:将我们的输入数据进行分片(split)
TextInpuFormat:处理文本格式的数据
OutputFormat:与上面InputFormat对应处理输出
Combiner,Partitioner 下文会介绍到
3.MapReduce架构
MapReduce1.x的架构
1.JobTracker:JT
作业的管理者
将作业分解成一堆的任务:Task(MapTask和ReduceTask)
将任务分派给TaskTracker运行
作业的监控,容错处理(task作业挂了,重启task的机制)
在一定的时间间隔内,JT没有收到TT的心跳信息,TT可能是挂了,TT上运行的任务会被指派到其他TT上去执行
2.TaskTracker:TT
任务的执行者 干活的
在TT上执行我们的Task(MapTask和
ReduceTask)
会与JT进行交互:执行/启动/停止作业,发送心跳信息给JT
3.MapTask
自己开发的map任务交由该Task处理
解析每条记录的数据,交给自己的map方法处理
将map的输出结果写到本地磁盘
4.ReduceTask
将Map Task输出的数据进行读取
按照数据进行分组,分组之后传给我们自己编写的reduce方法处理
输出结果写到hdfs
4.MapReduce编程
1.使用MapReduce开发WordCount应用程序
public class WordCountApp{ //Map:读取输入文件 //longWritable本质上就是存储长整型的值 //LongWritable 文件的偏移量 Text是一行一行的数据 //Text 为输入的数据经过map操作过后的输出 即是单词,所以是字符串,所以在hadoop中为Text类型 //最后一个泛型:longWritable记录该单词出现的频率 //前面两个参数代表输入的参数,后面两个代表map作业的输出类型 public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ LongWritable one=new LongWritable(1); //LongWritable key 偏移量 //Text value 每一行的字符串 protected void map(LongWritable key,Text value,Context context) throws IoException{ //接收到的每一行数据,给他转成string类型方便以后操作 String line=value.toString(); //按照指定分隔符进行拆分 String[] words=line.split(" "); for(String word:words) { //通过上下文把map的处理结果输出 context.write(new Text(word),one); } } } //Reduce:归并操作 public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ protected void reduce(Text key,Iterable<LongWritable>values,Context context)throws Excpetion{ //统计单词次数 long sum=0; for(LongWritable value:values){ sum+=value.get(); } //最终统计结果的输出 context.write(key,new LongWritable(sum)); } } //定义Driver:封装了MapReduce作业的所有信息 public static void main(String[] args) { //创建Configuration Configuration configuration=new Configuration(); //创建作业 Job job=job.getInstance(configuration,"wordcount"); //设置Job的处理类 job.setJarByClass(WordCountApp.class); //设置作业处理的输入路径 FileInputFormat.setInputPaths(job,new Path(args[0])); //设置map相关参数 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置reduce相关参数 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //设置作业处理的输出路径 FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交 System.exit(job.waitForCompletion(true)?0:1); } }
1.1打成jar包
1.2将jar包上传到服务器 ~/lib
#1.3 运行jar包 cd /home/hadoop/lib hadoop jar 2018-7-15_hdfs_02-0.0.1-SNAPSHOT.jar test.MapReduceApp hdfs://hadoop000:8020/hello.txt hdfs://hadoop000:8020/output/wc
问题:相同的代码和脚本再次执行,会报错
在MR中,输出文件是不能实现存在的
1.先通过shell的方式将输出文件夹先删除
2.在代码中完成自动删除(推荐)
在main方法中添加
Path outputPath=new Path(args[1]); FileSystem fileSystem=FileSystem.get(configuration); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath,true); System.out.println("output file exists,but is has deleted"); }
2.MapReduce编程之Combiner
本地的reducer
减少Map Tasks输出的数据量及数据网络传输量
对上面代码进行改动加入combiner
//Reduce:归并操作 public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ protected void reduce(Text key,Iterable<LongWritable>values,Context context)throws Excpetion{ //统计单词次数 long sum=0; for(LongWritable value:values){ sum+=value.get(); } //最终统计结果的输出 context.write(key,new LongWritable(sum)); } } //定义Driver:封装了MapReduce作业的所有信息 public static void main(String[] args) { //创建Configuration Configuration configuration=new Configuration(); //创建作业 Job job=job.getInstance(configuration,"wordcount"); //设置Job的处理类 job.setJarByClass(WordCountApp.class); //设置作业处理的输入路径 FileInputFormat.setInputPaths(job,new Path(args[0])); //设置map相关参数 job.setMapperClass(MyMapper.class); job.setMapOutputClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置reduce相关参数 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //通过job设置combiner处理类,其实逻辑上和我们的reduce是一模一样 job.setCombinerClass(MyReducer.class); //设置作业处理的输出路径 FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交 System.exit(job.waitForCompletion(true)?0:1); } }
使用场景:
求和,次数
平均数(不可用)
3.MapReduce编程之Partitioner
Partitioner决定MapTask输出的数据交由那个ReduceTask处理
默认实现:分发的key的hash值对ReduceTask个数取模
案例:需求有两天的手机销售额,输出相同品牌手机销售额的和
//1 创建mapper处理类 public class MyMapper extends Mapper<LongWritable,Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //将value转化成字符串 String valueStr = value.toString(); //按照空格划分 String[] strArray = valueStr.split(" "); //输出 context.write(new Text(strArray[0]), new LongWritable(Long.valueOf(strArray[1]))); } }
//创建reducer处理类 public class MyReducer extends Reducer<Text, LongWritable,Text, LongWritable>{ @Override protected void reduce(Text text, Iterable<LongWritable> it,Context context) throws IOException, InterruptedException { long sum=0L; for (LongWritable longWritable : it) { sum+=longWritable.get(); } context.write(text, new LongWritable(sum)); } }
//3 创建partitioner处理类 public class MyPartitioner extends Partitioner<Text, LongWritable>{ @Override public int getPartition(Text key, LongWritable value, int numPartitions) { String strKey=key.toString(); if(strKey.equals("xiaomi")) { return 0; } else if(strKey.equals("huawei")) { return 1; } else if(strKey.equals("iphone7")) { return 2; } else { return 3; } } }
//4 创建执行类 public class Partitioner { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); //解决输出文件存在 FileSystem fileSystem = FileSystem.get(configuration); if(fileSystem.exists(new Path(args[1]))) { fileSystem.delete(new Path(args[1]), true); System.out.println("the file exist,but it is deleted"); } //设置作业 Job job = Job.getInstance(configuration, "partitioner"); //设置处理类 job.setJarByClass(Partitioner.class); //设置输入路径 FileInputFormat.setInputPaths(job, args[0]); //设置map相关配置 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置reduce相关配置 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置combiner job.setCombinerClass(MyReducer.class); //设置partitioner job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(4); //设置输出 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
#5 将java文件打成jar包 #6 将jar包放到linux服务器上 #7 运行该jar包 cd /home/hadoop/lib hadoop jar 2018-7-15_mapreduce_01-0.0.1-SNAPSHOT.jar com.mapreduce.Partitioner hdfs://hadoop000:8020/partitioner.txt hdfs://hadoop000:8020/output/phoneSale
4.jobHistory
记录已经运行完的MapReduce信息到指定的HDFS目录下
默认是不开启的
开启步骤
1.需要在配置文件中进行配置
cd /home/hadoop/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop vim mapred-site.xml
<property> <name>mapreduce.jobhistory.address</name> <value>hadoop000:10020</value> <description>MapReduce JobHistory Server IPC host:port</description> </property> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>hadoop000:19888</value> <description>MapReduce JobHistory Server Web UI host:port</description> </property> <property> <name>mapreduce.jobhistory.done-dir</name> <value>/history/done</value> </property> <property> <name>mapreduce.jobhistory.intermediate-done-dir</name> <value>/history/done_intermediate</value> </property>
2.重启yarn服务
cd /home/hadoop/app/hadoop-2.6.0-cdh5.7.0/sbin ./stop-yarn.sh ./start-yarn.sh
3.开启 jobhistory
进入到hadoop的sbin目录下
./mr-jobjobhistory-daemon.sh start historyserver
4.验证
jsp出现jobhistory进程
5.进行测试
cd /home/hadoop/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce #执行mapreduce案例 hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3
6.打开网页查看结果,并点击history
跳转到如下页面说明配置成功
问题:点击logs出现错误信息
6. 配置yarn的聚合功能
vim yarn-site.xml
<!--添加下面参数--> <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property>