场景:
在MapReduce操作Hbase的时候有时候不需要reduce,只需要用Mapper的时候读A表,处理完之后写B表,也就是A表为输入表,B表为输出表。需求就是 A中的有一列E:E,把‘E:E’中为数字的数据保存到B表中。
表说明:
create A,'E'
create B,'E'
方案一:直接在mapper中打开B表,如果不是数字,那么直接Put进去,这个比较简单优缺点就不说了
方案二:TableMapReduceUtil.initTableMapperJob把A表作为输入,同时设置outputValueClass为Put,然后设置OutputTable即可,不需要reduce
实现:
Job:
private static void runJob() { String outputTableName = "B"; String inputTableName = "A"; Configuration conf = HBaseConfiguration.create(); conf.set("hbase.master", XXX); conf.set("hbase.zookeeper.quorum", XXX); conf.set("hbase.cluster.distributed", "true"); conf.set(TableOutputFormat.OUTPUT_TABLE, outputTableName); try { Scan scan = new Scan(); Job job = new Job(conf, "DataFormat Task"); job.setJarByClass(DataFormatTask.class); TableMapReduceUtil.initTableMapperJob(inputTableName, scan, DataFormatMapper.class, NullWritable.class, Put.class, job); job.setOutputFormatClass(TableOutputFormat.class); job.setNumReduceTasks(0); job.waitForCompletion(true); } catch (Throwable e) { throw new RuntimeException("Run DataFormatTask error! ", e); } finally { HConnectionManager.deleteConnection(conf, true); } }
Main:
public static void main(String[] args) { runJob(); }
DataFormatMapper:
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { LOGGER.info("key:" + Bytes.toString(key.get())); LOGGER.info("row:" + Bytes.toString(value.getRow())); String val = Bytes.toString(value.getValue(Bytes.toBytes("E"), Bytes.toBytes("E"))); if (!NumberUtils.isDigits(val)) { return; } Put put = new Put(key.get()); put.add(Bytes.toBytes("E"), Bytes.toBytes("E"), Bytes.toBytes(val)); context.write(NullWritable.get(), put); }