mr整体流程
maptask
调用FileInputFormat的createRecordReader(底层lineRecordReader)读取分片数据
每行数据读取一次,返回一个(K,V)对,K是offset,V是一行数据
将k-v对交给maptask处理
每对k-v调用一次map(K,V,context)方法,然后context.write(k,v)
写出的数据交给收集器OutputCollector.collector()处理
将数据写入环形缓冲区,并记录写入的起始偏移量,终止偏移量,环形缓冲区默认大小100M
默认写到80%的时候要溢写磁盘,溢写磁盘的过程中数据继续写入剩余20%
环形缓冲区(在80%时开始溢写(锁定这80的空间,同时通过(索引(key的开始,value的开始,value的结束,p的信息))
溢写磁盘之前要先进行分区然后分区内进行排序
默认的分区规则是hashpatitioner,即key的hash%reduceNum
默认的排序规则是key的字典顺序,使用的是快速排序
溢写会形成多个文件,在maptask读取完一个分片数据后,先将环形缓冲区数据刷写到磁盘
将数据多个溢写文件进行合并,分区内排序(外部排序===》归并排序)
reduceTask执行详解
数据按照分区规则发送到reducetask
reducetask将来自多个maptask的数据进行合并,排序(外部排序===》归并排序)
按照key相同分组()
一组数据调用一次reduce(k,iterablevalues,context)
处理后的数据交由reducetask
reducetask调用FileOutputFormat组件
FileOutputFormat组件中的write方法将数据写出
shuffle概念
shuffle过程从map写数据到环形缓冲区到reduce读取数据合并(见maptask和reducetask执行过程)
洗牌打乱
Collections.shuffle(List):
mr的shuffle:map阶段输出到reduce输入的这段过程
buffer in memory:圆形缓冲区(80M兆溢写)
根据key快排:
合并:根据分区合并
默认分区:1 (hashcode%1恒等于0)
reduce去map的分区合并的数据拉去自己的(分区的reduce)的数组(appmaster告知)
reduce在拉取的过程(多个map的)中合并(排序)一个临时文件
part-r-00000:-r执行过reduce:part:分区
map的执行结果在圆形缓冲区中默认是100M,阈值是80M.
OutputCollection.collector():收集器
合并小文件(归并)
80M一溢写可能会有大量的80M的小文件
切片:
fileinputformat:中计算切片信息(getspilt)
切片信息:
本地运行
本地提交(集群运行) debug 用于本地模拟生产环境
.cross-platform.
conf.setBoolean(“mapreduce.app-submission.cross-platform”, true);
这是那个参数
那个配置文件
集群提交集群运行 :用于测试,生产,研发
mr的数据类型
Text;
intWritable
VintWritable
longWritable
vlongWritable
BooleanWritable
NullWritable
MapWritable<>;
java有mr没有的
short
自定义组件combiner
1.Combiner是MR程序中Mapper和Reduce之外的一种组件
2.Combiner组件的父类就是Reducer
3.Combiner和Reducer之间的区别在于运行的位置
4.Reducer是每一个接收全局的Map Task 所输出的结果
5.Combiner是在MapTask的节点中运行
6.每一个map都会产生大量的本地输出,Combiner的作用就是对map输出的结果先做一次合并,以较少的map和reduce节点中的数据传输量
7.Combiner的存在就是提高当前网络IO传输的性能,也是MapReduce的一种优化手段。
8.Combiner的具体实现:
wordcount案例中直接调用写好的reduce,即在job的设置中加入
job.setCombinerClass(WordCountReduce.class);
public class WordCountRunner {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
//获取job并携带参数
Job job = Job.getInstance(conf,"wordCount");
//可以用job对象封装一些信息
//首先程序是一个jar包,就要指定jar包的位置
//将jar包放在root目录下
//可以将这个程序打包为pv.jar,上传到linux机器上
//使用命令运行
//hadoop jar /root/pv.jar pvcount.PvCountRunner /data/pvcount /out/pvcount
//job.setJar("d:/word.jar");
/**
* 一个jar包中可能有多个job,所以我们要指明job使用的是哪儿一个map类
* 哪儿一个reduce类
*/
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
/**
* map端输出的数据要进行序列化,所以我们要告诉框架map端输出的数据类型
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
/**
* reduce端要输出,所有也要指定数据类型
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
/**
* 告诉框架用什么组件去读数据,普通的文本文件,就用TextInputFormat
* 导入长包
*/
job.setInputFormatClass(TextInputFormat.class);
/**
* 告诉这个组件去哪儿读数据
* TextInputFormat有个父类FileInputFormat
* 用父类去指定到哪儿去读数据
* 输入路径是一个目录,该目录下如果有子目录需要进行设置递归遍历,否则会报错
*/
FileInputFormat.addInputPath(job,new Path(args[0]));
//设置写出的组件
job.setOutputFormatClass(TextOutputFormat.class);
//设置写出的路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setCombinerClass(WordCountReduce.class);
/**
* 信息设置完成,可以调用方法向yarn去提交job
* waitForCompletion方法就会将jar包提交给RM
* 然后rm可以将jar包分发,其他机器就执行
*/
//传入一个boolean类型的参数,如果是true,程序执行会返回true/flase
//如果参数传入true,集群在运行时会有进度,这个进度会在客户端打印
boolean res = job.waitForCompletion(true);
/**
* 客户端退出后会返回一个状态码,这个状态码我们可以写shell脚本的时候使用
* 根据返回的状态码的不同区执行不同的逻辑
*/
System.exit(res? 0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Hadoop序列化类型,以及自定义类型
MapReduce序列化,序列化是指将结构化对象转为字节流以便于通过网络进行传输或写入持久存储的过程。反序列化指的是将字节流转为结构化对象的过程。在Hadoop MapReduce中,序列化的主要作用有两个:永久存储和进程间通信。
为了能够读取或者存储Java对象,MapReduce编程模型要求用户输入和输出数据中的key和value必须是可序列化的。在Hadoop MapReduce中,使一个Java对象可序列化的方法是让其对应的类实现Writable接口。但对于key而言,由于它是数据排序的关键字,因此还需要提供比较两个key对象的方法。为此,key对应类需实现WritableComparable接口
(自定义数据类型)
需求:对于记录用户手机信息的文件,得出统计每一个用户(手机号)所耗费的总上行流量、下行流量,总流量结果
实现思路:实现自定义的 bean 来封装流量信息,使用手机号码作为Key,Bean作为value。这个Bean的传输需要实现可序列化,Java类中提供的序列化方法中会由很多冗余信息(继承关系,类信息)是我们不需要的,而这些信息在传输中占据大量资源,会导致有效信息传输效率减低。因此我们需要实现MapReduce的序列化接口Writable,自定义方法实现。
计算上行流量、下行流量、计费流量
取一行TextInputFormat类去读取,offset做key,一行数据做value,拆分,取出倒数第二倒数第三段
map端读取数据然后输出
offset:phoneNum,upflow,downflow
phoneNum:upfolw,downflow
phoneNum:{upfolw_downflow;upfolw1_downflow2}
phoneNum:totalupflow,totaldownflow,totalflow
考虑把字符串拼接的方式存储数据改写成类去存储数据,这个类三个属性,upflow,downflow,totalflow,这些数据在hadoop框架要进行序列化和反序列化
所以要实现writable接口,重写序列化和反序列方法
public class FlowBean implements Writable{
long upflow;
long downflow;
long sumflow;
//如果空参构造函数被覆盖,一定要显示定义一下,否则在反序列化时会抛出异常
public flowBean() {
}
public flowBean(long upflow, long downflow) {
this.upflow = upflow;
this.downflow = downflow;
this.sumflow=upflow+downflow;
}
public long getUpflow() {
return upflow;
}
public void setUpflow(long upflow) {
this.upflow = upflow;
}
public long getDownflow() {
return downflow;
}
public void setDownflow(long downflow) {
this.downflow = downflow;
}
public long getSumflow() {
return sumflow;
}
public void setSumflow(long sumflow) {
this.sumflow = sumflow;
}
@Override
public String toString() {
return upflow + "\t" + downflow + "\t" + sumflow;
}
//序列化,将对象的字段信息写入输出流
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upflow);
out.writeLong(downflow);
out.writeLong(sumflow);
}
//反序列化,从输入流读取各字段的信息
@Override
public void readFields(DataInput in) throws IOException {
upflow=in.readLong();
downflow=in.readLong();
sumflow=in.readLong();
}
}
public class flowCount {
static class MyMapper extends Mapper<LongWritable,Text,Text,flowBean>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phoneNum = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
flowBean bean = new flowBean(upFlow,downFlow);
context.write(new Text(phoneNum),bean);
}
}
static class MyReducer extends Reducer<Text,flowBean,Text,flowBean>{
@Override
protected void reduce(Text key, Iterable<flowBean> values, Context context) throws IOException, InterruptedException {
Iterator<flowBean> it = values.iterator();
long upflow=0;
long downflow=0;
while (it.hasNext()){
flowBean bean = it.next();
upflow+=bean.getUpflow();
downflow+=bean.getDownflow();
}
flowBean total = new flowBean(upflow, downflow);
context.write(key,total);
}
}
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
//1、配置连接hadoop集群的参数
Configuration conf = new Configuration();
// conf.set("fs.defaultFS","hdfs://qianfeng");
conf.set("fs.defaultFS","file:///");
conf.set("mapreduce.framework.name","local");
//2、获取job对象实例
Job job = Job.getInstance(conf,"FLOWCOUNT");
//3、指定本业务job的路径
job.setJarByClass(flowCount.class);
//4、指定本业务job要使用的Mapper类
job.setMapperClass(MyMapper.class);
//5、指定mapper类的输出数据的kv的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(flowBean.class);
//6、指定本业务job要使用的Reducer类
job.setReducerClass(MyReducer.class);
//7、设置程序的最终输出结果的kv的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(flowBean.class);
//8、设置job要处理的数据的输入源
FileInputFormat.setInputPaths(job,new Path("/data/flowinput"));
//判断输出目录是否存在,如果存在,则删除之
//9、设置job的输出目录
FileOutputFormat.setOutputPath(job,new Path("/out/flowoutput"));
//10、提交job
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
自定义分区:
[外链图片转存失败(img-255KxceZ-1568967115064)(D:/新机/千峰笔记/1567677569296.png)]
二次排序
[外链图片转存失败(img-mMgqKVYW-1568967115065)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1567568985274.png)]
行比较器:raw
map端的jion
使用场景:有一个或者多个小表(文件)
map端的join:
加缓存:
在job中。job.setcacheFiles():url数组
job.addcacheFile():url单个路径
定义map存值
在setup中使用流读文件放入map中
public Map<String, String> sexMap = new ConcurrentHashMap<String, String>();
public Map<String, String> userMap = new ConcurrentHashMap<String, String>();
/**
* 只在map阶段执行一次 setup()
*
* 此方法被MapReduce框架仅且执行一次,在执行Map任务前, 进行相关变量或者资源的集中初始化工作。 若是将资源初始化工作放在方法map()中,
* 导致Mapper任务在解析每一行输入时都会进行资源初始化工作, 导致重复,程序运行效率不高! cleanup()
* 此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后, 进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,
* 也会导致Mapper任务在解析、处理每一行文本后释放资源, 而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!
*/
/**
* 定义俩个内存数据结构来存储缓存文件的内容
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 首先获取缓存文件数组
// context.getLocalCacheFiles();
//DistributedCache.getLocalCacheFiles(new Configuration());
Path[] paths = context.getLocalCacheFiles();
for (Path path : paths) {
String filename = path.getName();
BufferedReader br = null;
if (filename.equals("sex")) {
br = new BufferedReader(new FileReader(new File(path.getName())));
String str = null;
while ((str = br.readLine()) != null) {
String strs[] = str.split("\t");
sexMap.put(strs[0], strs[1]);
}
// 关闭流
br.close();
} else if (filename.equals("user")) {
br = new BufferedReader(new FileReader(new File(path.getName())));
String str = null;
while ((str = br.readLine()) != null) {
String strs[] = str.split("\t");
userMap.put(strs[0], strs[1]);
}
// 关闭流
br.close();
}
}
}
reduce的jion
连续的job
// 驱动
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取配置对象信息,和对配置对象进行设置(没有就不用了)获取Job对象
Job grepjob = Job.getInstance(new Configuration(), "grep job");
// 设置Job的运行主类
grepjob.setJarByClass(DeperDemo.class);
// 对map阶段进行设置
grepjob.setMapOutputKeyClass(Text.class);
grepjob.setMapOutputValueClass(Text.class);
grepjob.setMapperClass(GrepMapper.class);
FileInputFormat.addInputPath(grepjob, new Path("/GrepWords"));
FileOutputFormat.setOutputPath(grepjob, new Path("/output"));
// 依赖的job
Job countjob = Job.getInstance(new Configuration(), "Count job");
countjob.setJarByClass(DeperDemo.class);
countjob.setMapOutputKeyClass(Text.class);
countjob.setMapOutputValueClass(IntWritable.class);
countjob.setMapperClass(CountMapper.class);
FileInputFormat.addInputPath(countjob, new Path("/output"));
// 对reduce阶段设置
countjob.setOutputKeyClass(Text.class);
countjob.setOutputValueClass(IntWritable.class);
countjob.setReducerClass(CountReducer.class);
FileOutputFormat.setOutputPath(countjob, new Path("/output/out00"));
// 创建单个作业控制器
ControlledJob gcj = new ControlledJob(grepjob.getConfiguration());
ControlledJob ccj = new ControlledJob(countjob.getConfiguration());
// 再添加依赖
ccj.addDependingJob(gcj);
// 定义一个总的作业控制器
JobControl jc = new JobControl("grepjob and countjob");
// 将单个作业控制器放入总的作业控制器中
jc.addJob(gcj);
jc.addJob(ccj);
// 获取一个线程
Thread th = new Thread(jc);
// 启动线程
th.start();
// 判断job是否完全运行完成
if (jc.allFinished()) {
Thread.sleep(1000);
th.stop();
jc.stop();
System.exit(0);
}
}
}
discp:集群之间的copy。使用mr进行copy
常用命令:
hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo
bash$ hadoop distcp hdfs://nn1:8020/foo/a hdfs://nn1:8020/foo/b hdfs://nn2:8020/bar/foo
hadoop distcp -f hdfs://nn1:8020/srclist hdfs://nn2:8020/bar/foo
hadoop distcp -overwrite hdfs://nn1:8020/srclist hdfs://nn2:8020/bar/foo
操作选项:
[root@hadoop01 hadoop-2.7.1]# hadoop distcp
usage: distcp OPTIONS [source_path...] <target_path>
OPTIONS
-append Reuse existing data in target files and append new
data to them if possible
-async Should distcp execution be blocking
-atomic Commit all changes or none
-bandwidth <arg> Specify bandwidth per map in MB
-delete Delete from target, files missing in source
-diff <arg> Use snapshot diff report to identify the
difference between source and target
-f <arg> List of files that need to be copied
-filelimit <arg> (Deprecated!) Limit number of files copied to <= n
-i Ignore failures during copy
-log <arg> Folder on DFS where distcp execution logs are
saved
-m <arg> Max number of concurrent maps to use for copy
-mapredSslConf <arg> Configuration for ssl config file, to use with
hftps://
-overwrite Choose to overwrite target files unconditionally,
even if they exist.
-p <arg> preserve status (rbugpcaxt)(replication,
block-size, user, group, permission,
checksum-type, ACL, XATTR, timestamps). If -p is
specified with no <arg>, then preserves
replication, block size, user, group, permission,
checksum type and timestamps. raw.* xattrs are
preserved when both the source and destination
paths are in the /.reserved/raw hierarchy (HDFS
only). raw.* xattrpreservation is independent of
the -p flag. Refer to the DistCp documentation for
more details.
-sizelimit <arg> (Deprecated!) Limit number of files copied to <= n
bytes
-skipcrccheck Whether to skip CRC checks between source and
target paths.
-strategy <arg> Copy strategy to use. Default is dividing work
based on file sizes
-tmp <arg> Intermediate work path to be used for atomic
commit
-update Update target, copying only missingfiles or
directories
more details.
-sizelimit (Deprecated!) Limit number of files copied to <= n
bytes
-skipcrccheck Whether to skip CRC checks between source and
target paths.
-strategy Copy strategy to use. Default is dividing work
based on file sizes
-tmp Intermediate work path to be used for atomic
commit
-update Update target, copying only missingfiles or
directories