HBase表
create 'words','line'
put 'words','1234','line:words','this,is,a,first,lin'
put 'words','1235','line:words','this,is,second,,lin'
create 'wordcount','info'
示例代码
/**
* 從HBae中讀取數據,并計算WordCount
* @author zhangdong
* */
public class HBaseMR extends Configured implements Tool{
private static class HBaseMapper extends TableMapper<Text, IntWritable> {
IntWritable one = new IntWritable();
Text text = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String words = Bytes.toString(value.getValue("line".getBytes(), "words".getBytes()));
String[] wordsArray = words.split(",");
for(String word : wordsArray) {
if(!Objects.isNull(word) && !"".equals(word)) {
text.set(word);
context.write(text, one);
}
}
}
}
private static class HBaseReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
if(!java.util.Objects.isNull(key) && !"".equals(key.toString())) {
int sum = 0;
for(IntWritable value : values) {
sum += value.get();
}
Put row = new Put(key.getBytes());
row.addColumn("info".getBytes(), "count".getBytes(), Bytes.toBytes(sum));
context.write(null, row);
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("hbase.zookeeper.quorum", "node01,node02,node03");
Job job = Job.getInstance(conf);
job.setJarByClass(HBaseMapper.class);
job.setNumReduceTasks(1);
//Query data condition
Scan scan = new Scan();
//Set Mapper input form hbase
TableMapReduceUtil.initTableMapperJob("words", scan, HBaseMapper.class, Text.class, IntWritable.class, job);
//set reduce output to hbase
TableMapReduceUtil.initTableReducerJob("wordcount", HBaseReducer.class, job);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int reuslt = ToolRunner.run(new HBaseMR(), args);
System.exit(reuslt);
}
}
运行上面的代码是需要将HBASE_HOME/lib目录设置到hadoop的class环境变量中或将HBASE_HOME/lib目录下的jar包拷贝到HADOOP_HOME/share/hadoop/mapreduce下。