目录
1.单词统计
顾名思义,单词统计主要用于大规模数据中出现字符频数统计,是一个经典的mapreduce程序。
1.1 实例描述
对输入文件出现的单词进行统计,将结果输出给一个新文件。
样本输入:
1)java hadoop
hbase hadoop java
hive hotspot
2) hadoop hotspot
hive java spark
样本输出:
hadoop3 hbase1 hive2 hotpot2 java3 spark1
1.2 设计思路
map阶段输入为一个LongWritable(起始偏移量),Text(实际单词),reduce阶段同一组key调用reducer将reduce输入的value放入迭代器遍历并累加,将其结果作为reduce输出的value,从而得到单词统计。
1.3 程序代码
package cn.itcast.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class wordcountmapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
Text text = new Text();
IntWritable intWritable=new IntWritable();
for (String word : words) {
text.set(word);
intWritable.set(1);
context.write(text, intWritable);
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
package cn.itcast.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class wordcountreduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
package cn.itcast.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class wordcountdrive {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(wordcountdrive.class);
job.setMapperClass(wordcountmapper.class);
job.setReducerClass(wordcountreduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b=job.waitForCompletion(true);
System.exit(b?0:1);
}
}
2.数据去重
"数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。下面就进入这个实例的MapReduce程序设计。
2.1 实例描述
1)
66 55 23 23 55
2)
12 23 12 66
2.2 设计思路
数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,而对value-list则没有要求。当reduce接收到一个<key,value-list>时就直接将key复制到输出的key中,并将value设置成空值。
在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后会交给reduce。所以从设计好的reduce输入可以反推出map的输出key应为数据,value任意。继续反推,map输出数据的key为数据,而在这个实例中每个数据代表输入文件中的一行内容,所以map阶段要完成的任务就是在采用Hadoop默认的作业输入方式之后,将value设置为key,并直接输出(输出中的value任意)。map中的结果经过shuffle过程之后交给reduce。reduce阶段不会管每个key有多少个value,它直接将输入的key复制为输出的key,并输出就可以了(输出中的value被设置成空了)。
2.3 程序代码
package cn.cast.hadoop;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class RemoveRepeatmapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = value;
context.write(text, new Text(""));
}
public static void main(String[] args) {
}
}
package cn.cast.hadoop;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class RemoveRepeatreducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key, new Text(""));
}
public static void main(String[] args) {
}
}
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RemoveRepeatdrive {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(RemoveRepeatdrive.class);
job.setMapperClass(RemoveRepeatmapper.class);
job.setReducerClass(RemoveRepeatreducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b=job.waitForCompletion(true);
System.exit(b?0:1);
}
}
3.班级学科平均分数
"平均成绩"主要目的还是在重温经典"WordCount"例子,可以说是在基础上的微变化版,该实例主要就是实现一个计算学生平均成绩的例子。
3.1 实例描述
样本输入
1)
张三:
math 88
english 86
history 78
2)
李四:
math 98
english 66
history 82
3.2 设计思路
将学科名字作为map输入key,学科分数作为map输入value,输出为Test和InWritable,传入reduce时,将分数传入迭代器,遍历迭代器时将所有值相加除以2,得到平均分。
3.3 程序代码
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Averagemapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String results[] = line.split(" ");
IntWritable intWritable =new IntWritable();
intWritable.set(Integer.parseInt(results[1]));
String result =results[0];
context.write(new Text(result),intWritable);
}
public static void main(String[] args) {
}
}
package cn.itcast.hadoop;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Averagereducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable intWritable : values) {
count+=intWritable.get();
}
context.write(key,new IntWritable(count/2));
}
public static void main(String[] args) {
}
}
package cn.itcast.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Averagedriver {
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(Averagedriver.class);
job.setMapperClass(Averagemapper.class);
job.setReducerClass(Averagereducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job,args[0]);
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
参考:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html
持续更新~