MapReduce-从HBase读取处理后再写入HBase
代码如下
package com.hbase.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author:FengZhen * @create:2018年9月17日 * 从HBase读写入HBase * zip -d HBaseToHBase.jar 'META-INF/.SF' 'META-INF/.RSA' 'META-INF/*SF' */ public class HBaseToHBase extends Configured implements Tool{ private static String addr="HDP233,HDP232,HDP231"; private static String port="2181"; public enum Counters { ROWS, COLS, VALID, ERROR, EMPTY, NOT_EMPTY} static class ParseMapper extends TableMapper<ImmutableBytesWritable, Put>{ private byte[] columnFamily = null; @Override protected void setup(Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException { columnFamily = Bytes.toBytes(context.getConfiguration().get("conf.columnfamily")); } @Override protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException { context.getCounter(Counters.ROWS).increment(1); String hbaseValue = null; Put put = new Put(key.get()); for (Cell cell : value.listCells()) { context.getCounter(Counters.COLS).increment(1); hbaseValue = Bytes.toString(CellUtil.cloneValue(cell)); if (hbaseValue.length() > 0) { String top = hbaseValue.substring(0, hbaseValue.length()/2); String detail = hbaseValue.substring(hbaseValue.length()/2, hbaseValue.length() - 1); put.addColumn(columnFamily, Bytes.toBytes("top"), Bytes.toBytes(top)); put.addColumn(columnFamily, Bytes.toBytes("detail"), Bytes.toBytes(detail)); context.getCounter(Counters.NOT_EMPTY).increment(1); }else { put.addColumn(columnFamily, Bytes.toBytes("empty"), Bytes.toBytes(hbaseValue)); context.getCounter(Counters.EMPTY).increment(1); } } try { context.write(key, put); context.getCounter(Counters.VALID).increment(1); } catch (Exception e) { e.printStackTrace(); context.getCounter(Counters.ERROR).increment(1); } } } static class ParseTableReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{ @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { for (Put put : values) { context.write(key, put); } } } public int run(String[] arg0) throws Exception { String table = arg0[0]; String column = arg0[1]; String destTable = arg0[2]; Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum",addr); configuration.set("hbase.zookeeper.property.clientPort", port); Scan scan = new Scan(); if (null != column) { byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); if (colkey.length > 1) { scan.addColumn(colkey[0], colkey[1]); configuration.set("conf.columnfamily", Bytes.toString(colkey[0])); configuration.set("conf.columnqualifier", Bytes.toString(colkey[1])); }else { scan.addFamily(colkey[0]); configuration.set("conf.columnfamily", Bytes.toString(colkey[0])); } } Job job = Job.getInstance(configuration); job.setJobName("HBaseToHBase2"); job.setJarByClass(HBaseToHBase2.class); job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, destTable); job.setMapperClass(ParseMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // job.setReducerClass(ParseTableReducer.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); job.setInputFormatClass(TableInputFormat.class); TableInputFormat.addColumns(scan, KeyValue.parseColumn(Bytes.toBytes(column))); job.setOutputFormatClass(TableOutputFormat.class); job.setNumReduceTasks(0); //使用TableMapReduceUtil会报类找不到错误 //Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.MetricsRegistry // TableMapReduceUtil.initTableMapperJob(table, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class, job); // TableMapReduceUtil.initTableReducerJob(table, IdentityTableReducer.class, job); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] params = new String[] {"test_table_mr", "data:info", "test_table_dest"}; int exitCode = ToolRunner.run(new HBaseToHBase2(), params); System.exit(exitCode); } }
打包测试
zip -d HBaseToHBase.jar 'META-INF/.SF' 'META-INF/.RSA' 'META-INF/*SF' hadoop jar HBaseToHBase.jar com.hbase.mapreduce.HBaseToHBase
出现的问题
一开始使用额TableMapReduceUtil,但是报下面这个错
Exception in thread "main" java.lang.NoClassDefFoundError: com/yammer/metrics/core/MetricsRegistry at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(TableMapReduceUtil.java:732) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(TableMapReduceUtil.java:777) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:212) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:168) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:291) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:92) at com.hbase.mapreduce.HBaseToHBase.run(HBaseToHBase.java:108) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90) at com.hbase.mapreduce.HBaseToHBase.main(HBaseToHBase.java:115) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:233) at org.apache.hadoop.util.RunJar.main(RunJar.java:148) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.MetricsRegistry at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 16 more
解决,不使用TableMapReduceUtil,分布设置便可解决此问题