mapreduce基础复习:
1. mapreduce流程
job-->TaskTracker-->Map/Reduce Task
resource manager-->app master-->container-->
scheduler执行计划
split-->map-->combiner-->partitioner-->reducer-->resource manager
2. hadoop1.0和hadoop2.0的区别
1.0管理和计算都是由mapreduce来完成
2.0管理由yarn负责,不仅仅减轻mapreduce的计算压力,而且使得计算框架可扩展
3. driver类的核心逻辑
Job对象–>指定driver类为驱动类–>设置mapper和reducer/[partitioner]–>设置mapper和reducer的输出–>设置输入输出文件路径–>运行
4. 文件关联的逻辑
mapper端负责把文件合并,并且根据业务逻辑,把相同的key归类
reducer端负责根据相同的key来进行关联
在idea中实现wordcount词频统计:
需求分析:
1.Map过程:并行读取文本,对读取的单词进行map操作,每个词都以<key,value>形式生成
2.Reduce操作是对map的结果进行排序合并最后得出词频
1.添加maven依赖
注意hadoop-core依赖与hadoop-auth冲突,要求先注释掉。
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.0</version>
</dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
2.编写mapper类
1.将文本转化成字符串
2.将字符串按指定规则切割
3.循环遍历,将每一个单词写出去
package cn.kgc.kb09.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @Author: ChaoKeAiMuZhi
* @Date: 2020/9/10 9:18
* @Description:
* KEYIN:输入的key类型
* VALUEIN:输入的value类型
* KEYOUT:输出的key类型
* VALUEOUT:输出的value类型
**/
public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(key);
//1.将文本转化成字符串
String line = value.toString();
//2.将字符串按指定规则切割
String[] words = line.split(" ");
//3.循环遍历,将每一个单词写出去
for (String word : words) {
context.write(new Text(word),new IntWritable(1));
}
}
}
3.编写reduce类
reduce端收到的类型是key,迭代器
1.对迭代器进行累加求和
2.将key和value进行写出
package cn.kgc.kb09.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @Author: ChaoKeAiMuZhi
* @Date: 2020/9/10 9:39
* @Description:
**/
public class WCReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//reduce端接收到的类型大概是这样的 (wish,(1,1,1,1))
//对迭代器进行累加求和
//total必须赋值为0初始化,因为reduce方法是每个键都会执行一次
int total=0;
for (IntWritable value : values) {
total+=value.get();
}
//将key和value进行写出
context.write(key,new IntWritable(total));
}
}
4.有可能还需要编写partitioner类分区
1.根据key的hashCode分区
package cn.kgc.kb09.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @Author: ChaoKeAiMuZhi
* @Date: 2020/9/10 12:23
* @Description:
**/
public class WCPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
return Math.abs(text.hashCode())%i;
}
}
5.编写Driver类执行Job
1.创建配置文件,创建Job
2.设置jar的位置,参数为本类类名.class
3.设置map和reduce的位置
4.设置map输出端的key,value类型
5.设置reduce输出的key,value类型
6.设置输入和输出路径,输入的是本地自己建的txt文件,会输出一个test目录
7.提交程序运行
package cn.kgc.kb09.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @Author: ChaoKeAiMuZhi
* @Date: 2020/9/10 9:47
* @Description:
**/
public class WCDriver {
public static void main(String[] args) throws Exception {
//1.建立连接
Configuration cfg = new Configuration();
Job job= Job.getInstance(cfg,"job_wc");
job.setJarByClass(WCDriver.class);
//2.指定mapper和reducer
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//指定mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置partitioner
job.setNumReduceTasks(4);
job.setPartitionerClass(WCPartitioner.class);
//指定reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定输入输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//3.运行
boolean result = job.waitForCompletion(true);
System.out.println(result?"成功":"失败");
System.exit(result?0:1);
}
}