该例子实现了 hdfs 中某目录下所有文件中出现单词数量的统计。
一共使用到了三个java类:
WordcountMapper 负责声名 mapTask
WordcountReducer 负责声名 ReduceTask
WordcountDriver 负责向 yarn 提交任务。
相关代码
Mapper
package com.roadom;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 该类做为一个 mapTask 使用。类声名中所使用的四个泛型意义为别为:
*
* KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,
* 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
* VALUEIN: 默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
* KEYOUT: 是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text
* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map阶段的业务逻辑就写在自定义的map()方法中 maptask会对每一行输入数据调用一次我们自定义的map()方法
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将maptask传给我们的文本内容先转换成String
String line = value.toString();
// 根据空格将这一行切分成单词
String[] words = line.split(" ");
// 将单词输出为<单词,1>
for (String word : words) {
// 将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
context.write(new Text(word), new IntWritable(1));
}
}
}
Reducer
package com.roadom;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 与 Mapper 类似,继承的同事声名四个泛型。
* KEYIN, VALUEIN 对应 mapper输出的KEYOUT,VALUEOUT类型对应
* KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型。此处 keyOut 表示单个单词,valueOut 对应的是总次数
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>
* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
* 入参key,是一组相同单词kv对的key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable value : values){
count += value.get();
}
context.write(key, new IntWritable(count)); //输出每一个单词出现的次数
}
/*
* 继承自 Reducer,当所有的 reduce 执行完以后执行。
*/
@Override
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
}
Driver。负责向yarn提交这次mapReduce任务。
package cncom.roadom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相当于一个yarn集群的客户端
* 需要在此封装我们的mr程序的相关运行参数,指定jar包
* 最后提交给yarn
*/
public class WordcountDriver {
public static void main(String[] args) throws Exception {
if (args == null || args.length == 0) {
return;
}
//该对象会默认读取环境中的 hadoop 配置。当然,也可以通过 set 重新进行配置
Configuration conf = new Configuration();
//job 是 yarn 中任务的抽象。
Job job = Job.getInstance(conf);
/*job.setJar("/home/hadoop/wc.jar");*/
//指定本程序的jar包所在的本地路径
job.setJarByClass(WordcountDriver.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//指定mapper输出数据的kv类型。需要和 Mapper 中泛型的类型保持一致
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最终输出的数据的kv类型。这里也是 Reduce 的 key,value类型。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
/*job.submit();*/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.5</version>
</dependency>
</dependencies>
<build>
<finalName>testWordcount</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!-- 指定 mainclass,该配置将最终体现在jar包manifest文件中 -->
<mainClass>com.roadom.WordcountDriver</mainClass>
</manifest>
</archive>
<classesDirectory>
</classesDirectory>
</configuration>
</plugin>
</plugins>
</build>
打包提交
执行 maven 的 package,jar 包将在项目 target 目录中产生。
将其上传到集群任意一台机器上去。
执行步骤
1、启动 hdfs 并建立目录,上传文件
#启动hadoop
start-dfs.sh
#...
#创建一个多层级的目录结构
hadoop fs -mkdir -p /wordcount/input
#将本地文件a.txt上传至目录中。也可以上传多个文件
hadoop fs -put a.txt /wordcount/input
2、启动 yarn
start-yarn.sh
#...
3、运行 jar 包
#hadoopjarjar包全路径主类全称[主类所需参数列表]
#由于jar包中有mainclass,因此无需在jar包参数后再增加主类名
hadoop jar testWordcount.jar /wordcount/input /wordcount/output
该种运行方式与传统的 java -cp 形式没有本质区别。无非是在运行的过程中不需要在 -cp 后面指定 hadoop 所有的依赖 jar。
但是如果程序中除了 hadoop 以外还依赖有其他第三方 jar包,则还是需要在 -cp 后面罗列出来。
java -cp 主类所在jar包全路径及所有依赖jar包全路径 主类全称 [主类所需参数列表]
或者,选择在打包的过程中,将所有依赖的第三方jar包打到主jar包中去。然后直接使用 java -jar 运行 runnable jar
java -jar wc.jar /wordCount/input /wordCount/output
1、需要将集群的hadoop自定义配置文件拷贝到jar包中去
2、需要在 main 方法中指定 wc.jar 即将存在的位置。
//job.setJarByClass(WordcountDriver.class); //该种方式将不再能找到类。
job.setJar("/home/hadoop/wc.jar");
运行日志
18/04/24 01:24:00 INFO client.RMProxy: Connecting to ResourceManager at centos00/192.168.2.100:8032
18/04/24 01:24:01 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/24 01:24:01 INFO input.FileInputFormat: Total input paths to process : 3
18/04/24 01:24:02 INFO mapreduce.JobSubmitter: number of splits:3
18/04/24 01:24:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1524496863055_0002
18/04/24 01:24:02 INFO impl.YarnClientImpl: Submitted application application_1524496863055_0002
18/04/24 01:24:02 INFO mapreduce.Job: The url to track the job: http://centos00:8088/proxy/application_1524496863055_0002/
18/04/24 01:24:02 INFO mapreduce.Job: Running job: job_1524496863055_0002
18/04/24 01:24:15 INFO mapreduce.Job: Job job_1524496863055_0002 running in uber mode : false
18/04/24 01:24:15 INFO mapreduce.Job: map 0% reduce 0%
18/04/24 01:24:41 INFO mapreduce.Job: map 100% reduce 0%
18/04/24 01:24:52 INFO mapreduce.Job: map 100% reduce 100%
18/04/24 01:24:53 INFO mapreduce.Job: Job job_1524496863055_0002 completed successfully
18/04/24 01:24:53 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=201730
FILE: Number of bytes written=832609
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=101534
HDFS: Number of bytes written=30057
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=3
Launched reduce tasks=1
Data-local map tasks=3
Total time spent by all maps in occupied slots (ms)=68494
Total time spent by all reduces in occupied slots (ms)=8225
Total time spent by all map tasks (ms)=68494
Total time spent by all reduce tasks (ms)=8225
Total vcore-milliseconds taken by all map tasks=68494
Total vcore-milliseconds taken by all reduce tasks=8225
Total megabyte-milliseconds taken by all map tasks=70137856
Total megabyte-milliseconds taken by all reduce tasks=8422400
Map-Reduce Framework
Map input records=2030
Map output records=16756
Map output bytes=168212
Map output materialized bytes=201742
Input split bytes=337
Combine input records=0
Combine output records=0
Reduce input groups=2401
Reduce shuffle bytes=201742
Reduce input records=16756
Reduce output records=2401
Spilled Records=33512
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=1219
CPU time spent (ms)=3760
Physical memory (bytes) snapshot=498151424
Virtual memory (bytes) snapshot=8303079424
Total committed heap usage (bytes)=375455744
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=101197
File Output Format Counters
Bytes Written=30057
4、查看运行结果
hadoopfs-cat/wordcount/output/part-r-00000