MapReduce功能实现系列:
MapReduce功能实现一---Hbase和Hdfs之间数据相互转换
MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
MapReduce功能实现五---去重(Distinct)、计数(Count)
MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能实现七---小综合(多个job串行处理计算平均值)
MapReduce功能实现八---分区(Partition)
MapReduce功能实现十---倒排索引(Inverted Index)
一、从Hbase表1中读取数据再把统计结果存到表2
在Hbase中建立相应的表1:
-
create
'hello',
'cf'
-
put
'hello',
'1',
'cf:hui',
'hello world'
-
put
'hello',
'2',
'cf:hui',
'hello hadoop'
-
put
'hello',
'3',
'cf:hui',
'hello hive'
-
put
'hello',
'4',
'cf:hui',
'hello hadoop'
-
put
'hello',
'5',
'cf:hui',
'hello world'
-
put
'hello',
'6',
'cf:hui',
'hello world'
java代码:
-
import java.io.IOException;
-
import java.util.Iterator;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.HColumnDescriptor;
-
import org.apache.hadoop.hbase.HTableDescriptor;
-
import org.apache.hadoop.hbase.client.HBaseAdmin;
-
import org.apache.hadoop.hbase.client.Put;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-
import org.apache.hadoop.hbase.mapreduce.TableMapper;
-
import org.apache.hadoop.hbase.mapreduce.TableReducer;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
-
public
class HBaseToHbase {
-
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
String hbaseTableName1 =
"hello";
-
String hbaseTableName2 =
"mytb2";
-
-
prepareTB2(hbaseTableName2);
-
-
Configuration conf =
new Configuration();
-
-
Job job = Job.getInstance(conf);
-
job.setJarByClass(HBaseToHbase.class);
-
job.setJobName(
"mrreadwritehbase");
-
-
Scan scan =
new Scan();
-
scan.setCaching(
500);
-
scan.setCacheBlocks(
false);
-
-
TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);
-
TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);
-
System.exit(job.waitForCompletion(
true) ?
1 :
0);
-
}
-
-
public
static
class doMapper extends TableMapper<Text, IntWritable>{
-
private
final
static IntWritable one =
new IntWritable(
1);
-
@Override
-
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
-
String rowValue = Bytes.toString(value.list().get(
0).getValue());
-
context.write(
new Text(rowValue), one);
-
}
-
}
-
-
public
static
class doReducer extends TableReducer<Text, IntWritable, NullWritable>{
-
@Override
-
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
-
System.out.println(key.toString());
-
int sum =
0;
-
Iterator<IntWritable> haha = values.iterator();
-
while (haha.hasNext()) {
-
sum += haha.next().get();
-
}
-
Put put =
new Put(Bytes.toBytes(key.toString()));
-
put.add(Bytes.toBytes(
"mycolumnfamily"), Bytes.toBytes(
"count"), Bytes.toBytes(String.valueOf(sum)));
-
context.write(NullWritable.get(), put);
-
}
-
}
-
-
public static void prepareTB2(String hbaseTableName) throws IOException{
-
HTableDescriptor tableDesc =
new HTableDescriptor(hbaseTableName);
-
HColumnDescriptor columnDesc =
new HColumnDescriptor(
"mycolumnfamily");
-
tableDesc.addFamily(columnDesc);
-
Configuration cfg = HBaseConfiguration.create();
-
HBaseAdmin admin =
new HBaseAdmin(cfg);
-
if (admin.tableExists(hbaseTableName)) {
-
System.out.println(
"Table exists,trying drop and create!");
-
admin.disableTable(hbaseTableName);
-
admin.deleteTable(hbaseTableName);
-
admin.createTable(tableDesc);
-
}
else {
-
System.out.println(
"create table: "+ hbaseTableName);
-
admin.createTable(tableDesc);
-
}
-
}
-
}
在Linux中执行该代码:
-
[hadoop@h71 q1]$ /usr/jdk1.
7.0_25/bin/javac HBaseToHbase.java
-
[hadoop@h71 q1]$ /usr/jdk1.
7.0_25/bin/jar cvf xx.jar HBaseToHbase*
class
-
[hadoop@h71 q1]$ hadoop jar xx.jar HBaseToHbase
查看mytb2表:
-
hbase(main):
009:
0> scan
'mytb2'
-
ROW COLUMN+CELL
-
hello hadoop column=mycolumnfamily:count, timestamp=
1489817182454,
value=
2
-
hello hive column=mycolumnfamily:count, timestamp=
1489817182454,
value=
1
-
hello world column=mycolumnfamily:count, timestamp=
1489817182454,
value=
3
-
3 row(s)
in
0.0260 seconds
二、从Hbase表1中读取数据再把结果存Hdfs中
1.将表1的内容不统计输出:
-
import java.io.IOException;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-
import org.apache.hadoop.hbase.mapreduce.TableMapper;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.io.WritableComparable;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-
public
class HbaseToHdfs {
-
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
-
String tablename =
"hello";
-
Configuration conf = HBaseConfiguration.create();
-
conf.set(
"hbase.zookeeper.quorum",
"h71");
-
Job job =
new Job(conf,
"WordCountHbaseReader");
-
job.setJarByClass(HbaseToHdfs.class);
-
Scan scan =
new Scan();
-
TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, Text.class, job);
-
job.setReducerClass(WordCountHbaseReaderReduce.class);
-
FileOutputFormat.setOutputPath(job,
new Path(args[
0]));
-
MultipleOutputs.addNamedOutput(job,
"hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
-
System.exit(job.waitForCompletion(
true) ?
0 :
1);
-
}
-
-
public
static
class doMapper extends TableMapper<Text, Text>{
-
@Override
-
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
-
String rowValue = Bytes.toString(value.list().get(
0).getValue());
-
context.write(
new Text(rowValue),
new Text(
"one"));
-
}
-
}
-
-
public
static
class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,NullWritable>{
-
private Text result =
new Text();
-
@Override
-
protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
-
for(Text val:values){
-
result.set(val);
-
context.write(key, NullWritable.get());
-
}
-
}
-
}
-
}
在Linux中执行该代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseToHdfs.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseToHdfs*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseToHdfs /output
注意:/output目录不能存在,如果存在就删除掉
[hadoop@h71 q1]$ hadoop fs -ls /output
Found 2 items
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 14:28 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 73 2017-03-18 14:28 /output/part-r-00000
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hello hadoop
hello hadoop
hello hive
hello world
hello world
hello world
2.将表1的内容统计输出:
-
import java.io.IOException;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-
import org.apache.hadoop.hbase.mapreduce.TableMapper;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.io.WritableComparable;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-
public
class HbaseToHdfs1 {
-
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
-
String tablename =
"hello";
-
Configuration conf = HBaseConfiguration.create();
-
conf.set(
"hbase.zookeeper.quorum",
"h71");
-
Job job =
new Job(conf,
"WordCountHbaseReader");
-
job.setJarByClass(HbaseToHdfs1.class);
-
Scan scan =
new Scan();
-
TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);
-
job.setReducerClass(WordCountHbaseReaderReduce.class);
-
FileOutputFormat.setOutputPath(job,
new Path(args[
0]));
-
MultipleOutputs.addNamedOutput(job,
"hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
-
System.exit(job.waitForCompletion(
true) ?
0 :
1);
-
}
-
-
public
static
class doMapper extends TableMapper<Text, IntWritable>{
-
private
final
static IntWritable one =
new IntWritable(
1);
-
private Text word =
new Text();
-
@Override
-
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
-
/*
-
String rowValue = Bytes.toString(value.list().get(0).getValue());
-
context.write(new Text(rowValue), one);
-
*/
-
String[] rowValue = Bytes.toString(value.list().get(
0).getValue()).split(
" ");
-
for (String str: rowValue){
-
word.set(str);
-
context.write(word,one);
-
}
-
}
-
}
-
-
public
static
class WordCountHbaseReaderReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
-
@Override
-
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
-
int total=
0;
-
for(IntWritable val:values){
-
total++;
-
}
-
context.write(key,
new IntWritable(total));
-
}
-
}
-
}
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hadoop 2
hello 6
hive 1
world 3
三、读取Hdfs文件将统计结果存入到Hbase表中
创建文件并上传到Hdfs中:
[hadoop@h71 q1]$ vi hello.txt hello world hello hadoop hello hive hello hadoop hello world hello world [hadoop@h71 q1]$ hadoop fs -mkdir /input [hadoop@h71 q1]$ hadoop fs -put hello.txt /input
java代码:
-
import java.io.IOException;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.HColumnDescriptor;
-
import org.apache.hadoop.hbase.HTableDescriptor;
-
import org.apache.hadoop.hbase.client.HBaseAdmin;
-
import org.apache.hadoop.hbase.client.Put;
-
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-
import org.apache.hadoop.hbase.mapreduce.TableReducer;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
-
public
class HdfsToHBase {
-
-
public
static
class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
-
private IntWritable i =
new IntWritable(
1);
-
@Override
-
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-
String s[] = value.toString().trim().split(
"/n");
-
for (String m : s) {
-
context.write(
new Text(m), i);
-
}
-
}
-
}
-
-
public
static
class Reduce extends TableReducer<Text, IntWritable, NullWritable> {
-
@Override
-
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
-
int sum =
0;
-
for (IntWritable i : values) {
-
sum += i.get();
-
}
-
Put put =
new Put(Bytes.toBytes(key.toString()));
-
// 列族为cf,列为count,列值为数目
-
put.add(Bytes.toBytes(
"cf"), Bytes.toBytes(
"count"), Bytes.toBytes(String.valueOf(sum)));
-
context.write(NullWritable.get(), put);
-
}
-
}
-
-
public static void createHBaseTable(String tableName) throws IOException {
-
HTableDescriptor htd =
new HTableDescriptor(tableName);
-
HColumnDescriptor col =
new HColumnDescriptor(
"cf");
-
htd.addFamily(col);
-
Configuration conf = HBaseConfiguration.create();
-
conf.set(
"hbase.zookeeper.quorum",
"h71");
-
HBaseAdmin admin =
new HBaseAdmin(conf);
-
if (admin.tableExists(tableName)) {
-
System.out.println(
"table exists, trying to recreate table......");
-
admin.disableTable(tableName);
-
admin.deleteTable(tableName);
-
}
-
System.out.println(
"create new table:" + tableName);
-
admin.createTable(htd);
-
}
-
-
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-
//将结果存入hbase的表名
-
String tableName =
"mytb2";
-
Configuration conf =
new Configuration();
-
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
-
createHBaseTable(tableName);
-
String input = args[
0];
-
Job job =
new Job(conf,
"WordCount table with " + input);
-
job.setJarByClass(HdfsToHBase.class);
-
job.setNumReduceTasks(
3);
-
job.setMapperClass(Map.class);
-
job.setReducerClass(Reduce.class);
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(IntWritable.class);
-
job.setInputFormatClass(TextInputFormat.class);
-
job.setOutputFormatClass(TableOutputFormat.class);
-
FileInputFormat.addInputPath(job,
new Path(input));
-
// FileInputFormat.setInputPaths(job, new Path(input)); //这种方法也可以
-
System.exit(job.waitForCompletion(
true) ?
0 :
1);
-
}
-
}
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HdfsToHBase.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HdfsToHBase*class
[hadoop@h71 q1]$ hadoop jar xx.jar HdfsToHBase /input/hello.txt
-
hbase(main):
011:
0> scan
'mytb2'
-
ROW COLUMN+CELL
-
hello hadoop column=cf:count, timestamp=
1489819702236,
value=
2
-
hello hive column=cf:count, timestamp=
1489819702236,
value=
1
-
hello world column=cf:count, timestamp=
1489819704448,
value=
3
-
3 row(s)
in
0.3260 seconds
四、从Hdfs到Hdfs(其实就是mapreduce的经典例子wordcount)
java代码:
-
import java.io.IOException;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
public
class HdfsToHdfs{
-
-
public
static
class WordCountMapper extends Mapper<Object,Text,Text,IntWritable>{
-
private
final
static IntWritable one =
new IntWritable(
1);
-
private Text word =
new Text();
-
@Override
-
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
-
String[] words = value.toString().split(
" ");
-
for (String str: words){
-
word.set(str);
-
context.write(word,one);
-
}
-
}
-
}
-
-
public
static
class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
-
@Override
-
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
-
int total=
0;
-
for (IntWritable val : values){
-
total++;
-
}
-
context.write(key,
new IntWritable(total));
-
}
-
}
-
-
public static void main (String[] args) throws Exception{
-
Configuration conf =
new Configuration();
-
-
Job job =
new Job(conf,
"word count");
-
job.setJarByClass(HdfsToHdfs.class);
-
job.setMapperClass(WordCountMapper.class);
-
job.setReducerClass(WordCountReducer.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
-
FileInputFormat.setInputPaths(job,
new Path(args[
0]));
-
FileOutputFormat.setOutputPath(job,
new Path(args[
1]));
-
System.exit(job.waitForCompletion(
true) ?
0 :
1);
-
}
-
}
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HdfsToHdfs.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HdfsToHdfs*class
[hadoop@h71 q1]$ hadoop jar xx.jar HdfsToHdfs /input/hello.txt /output
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hadoop 2
hello 6
hive 1
world 3
说明:我这个wordcount例子是hadoop2版本的,我的另一篇文章http://blog.csdn.net/m0_37739193/article/details/71132652里的是hadoop1版本的例子,在hadoop0.20.0及以后同时包含了两个版本的的API,所以两个版本的代码都能运行
Hadoop MapReduce新旧API区别:
Hadoop的版本0.20.0包含有一个新的java MapReduce API,有时也称为"上下文对象"(context object),旨在使API在今后更容易扩展。新的API 在类型上不兼容先前的API,所以,需要重写以前的应用程序才能使新的API发挥作用。
新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,可以无需修改类的实现而在抽象类中添加一个方法(即用默认的实现)。在新的API中, mapper和reducer现在都是抽象类。
--接口,严格的“协议约束”,只有方法声明而没有方法实现,要求所有实现类(抽象类除外)必须实现接口中的每个方法。
--抽象类,较宽松的“约束协议”,可为某些方法提供默认实现,而继承类则可选择是否重新实现这些方法。故而抽象类在类衍化方面更有优势,即具有良好的向后兼容性。
新的API放在org.apache.hadoop.mapreduce包(和子包)中。之前版本的API依旧放在org.apache.hadoop.mapred中。
新的API充分使用上下文对象,使用户代码能与MapReduce系统通信。例如,MapContext 基本具备了JobConf、OutputCollector和Reporter的功能。
新的API同时支持"推"(push)和"拉"(pull)式的迭代。这两类API,均可以将键/值对记录推给mapper,但除此之外,新的API也允许把记录从map()方法中拉出。对reducer来说是一样的。"拉"式处理数据的好处是可以实现数据的批量处理,而非逐条记录地处理。
新增的API实现了配置的统一。旧API通过一个特殊的JobConf对象配置作业,该对象是Hadoop配置对象的一个扩展。在新的API中,我们丢弃这种区分,所有作业的配置均通过Configuration来完成。
新API中作业控制由Job类实现,而非JobClient类,新API中删除了JobClient类。
输出文件的命名方式稍有不同。map的输出文件名为part-m-nnnnn,而reduce的输出为part-r-nnnnn(其中nnnnn表示分块序号,为整数,且从0开始算。
将旧API写的Mapper和Reducer类转换为新API时,记住将map()和reduce()的签名转换为新形式。如果只是将类的继承修改为对新的Mapper和Reducer类的继承,编译的时候也不会报错或显示警告信息,因为新的Mapper和Reducer类同样也提供了等价的map()和reduce()函数。但是,自己写的mapper或reducer代码是不会被调用的,这会导致难以诊断的错误。