简介:
主要用于搜索领用,解决海量数据的计算问题。Map和Reduce都是独立的阶段。
处理多余10PB数据时趋向于变慢。
基于网络IO和磁盘IO计算的。(spark基于内存计算的)
模型:
- 计算海量的数据,不能在同一个机器上计算。
- 移动计算,不移动数据。
- 两个阶段
- map:映射阶段
- reduce:计算阶段
如何计算:
在每个datanode上进行计算,但datanode节点占用的资源非常大,所以就引出了资源调度管理(yarn),专门为MapReduce分配资源。
yarn的学习
- yarn是一个资源调度平台,给应用程序分配资源(cpu,内存,带宽。。。)
- yarn的组成
- resourceManager(资源管理器)
- 接受客户端提交作业的请求
- 为走也在nodeManager中分配执行的资源
- NodeManger(节点管理器)
- 执行job的节点
- 一般默认nodeManager都在DataNode上(移动计算,不移动数据)
- resourceManager(资源管理器)
MaperReduce的运行环境
- 本地
- yarn
MaperReduce的编写
- 编写map
-
package com.wt.bigdata; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @description: * @author:wt * @createTime:2021/11/29 13:51 * @version:1.0 */ public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { /** * @description: * key 一行的偏移量 * value 偏移量对应的值 * @author wt * @date 2021/11/29 15:36 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取数据中的一行数据 String[] words = value.toString().split(","); // 扔进盒子中,自身提供了一个context分区,将计算出来的数据记录出来 for (String word: words){ // map进行映射,完了之后传给reduce,进行计算 context.write(new Text(word) , new IntWritable(1)); } } }
-
- 编写reduce
-
package com.wt.bigdata; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @description: * @author:wt * @createTime:2021/11/29 13:51 * @version:1.0 */ public class WordCountReduce extends Reducer<Text , IntWritable ,Text , IntWritable > { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable i: values){ sum += i.get(); } context.write(key, new IntWritable(sum)); } }
-
- 编写提交作业
-
package com.wt.bigdata; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * @description: * @author:wt * @createTime:2021/11/29 16:38 * @version:1.0 */ public class WordCountJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 创建一个作业对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 设置map和reduce job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReduce.class); job.setJarByClass(WordCountJob.class); // 设置map的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置格式化类 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //设置文件输入路径 TextInputFormat.setInputPaths(job , new Path("file:///E:/data/mapreduce/input")); //设置文件输出路径 TextOutputFormat.setOutputPath(job , new Path("file:///E:/data/mapreduce/output")); //提交作业 job.waitForCompletion(true); } }
-
MapReduce的工作原理
mapreduc作业分为两个阶段
- TextInputFormat通过RecordReader类把按**换行符**来划分,每经过一个换行符就读取一行,把这一行交给map
- ==map阶段(Mapper)==
- ==reduce阶段(Reducer)==
- TextOutputFormat输出结果