建表以及存数据
public class InitTable {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection();
Admin admin = connection.getAdmin();
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("word"));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("col");
hTableDescriptor.addFamily(hColumnDescriptor);
admin.createTable(hTableDescriptor);
HTableDescriptor hTableDescriptor2 = new HTableDescriptor(TableName.valueOf("stat"));
HColumnDescriptor hColumnDescriptor2 = new HColumnDescriptor("ret");
hTableDescriptor2.addFamily(hColumnDescriptor2);
admin.createTable(hTableDescriptor2);
admin.close();
Table table = connection.getTable(TableName.valueOf("word"));
Put put = new Put(Bytes.toBytes("rk001"));
put.addColumn(Bytes.toBytes("col"), Bytes.toBytes("line"), Bytes.toBytes("hello tom how are you"));
Put put2 = new Put(Bytes.toBytes("rk002"));
put2.addColumn(Bytes.toBytes("col"), Bytes.toBytes("line"), Bytes.toBytes("hi hbase i am study"));
Put put3 = new Put(Bytes.toBytes("rk003"));
put3.addColumn(Bytes.toBytes("col"), Bytes.toBytes("line"), Bytes.toBytes("hadoop hadoop hello you tom hbase"));
List<Put> list = new ArrayList<>();
list.add(put);
list.add(put2);
list.add(put3);
table.put(list);
table.close();
connection.close();
}
}
词频分析主要代码
public class HbaseMR {
/**
* wordCount
* 输入的表:数据从一个hbase表里面 word 一个列簇col 有一个列line 存放的一句话
* 输出的表: stat 一个列簇 ret,有一个列 存放count,存放到rowkey中
* @author root
*
*/
public static class MapTask extends TableMapper<Text, IntWritable>{
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
byte[] value2 = value.getValue(Bytes.toBytes("col"), Bytes.toBytes("line"));
String line = new String(value2);
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
public static class ReduceTask extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : values) {
sum++;
}
String sum1 = String.valueOf(sum);
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("ret"), Bytes.toBytes("count"), Bytes.toBytes(sum1));
context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf,"hbasemr");
job.setJarByClass(HbaseMR.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob("word", scan, MapTask.class, Text.class,IntWritable.class, job);
TableMapReduceUtil.initTableReducerJob("stat", ReduceTask.class, job);
boolean b = job.waitForCompletion(true);
System.out.println(b?"程序没毛病":"出bug了");
}
}
在集群上运行
将项目打成jar包,上传到hdfs集群上,使用命令执行:
// jar包 要执行的Java的全类名
java -cp mr.jar:`hbase classpath` cn.pengpeng.mr.HbaseMr