若表中有数据,不会被覆盖。
若提示class not found,找到对应包,放到hadoop/share/hadoop/common/下
这是我做hadoop和hbase各种测试时复制过来的包列表
hadoop-common-2.5.2.jar
hadoop-nfs-2.5.2.jar
hbase-common-1.1.4.jar
hbase-protocol-1.1.4.jar
htrace-core-3.1.0-incubating.jar
mysql-connector-java-5.1.38-bin.jar
hadoop-common-2.5.2-tests.jar
hbase-client-1.1.4.jar
hbase-hadoop-compat-1.1.4.jar
hbase-server-1.1.4.jar
metrics-core-2.2.0.jar
netty-all-4.0.23.Final.jar
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; /* * hbase 表 map reduce算数 * */ public class HBaseTableMapReduce { /* 实现 Map 类 * 输出 类型 字符串 * 输出值类型 数字 */ public static class Map extends TableMapper <Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /* * 参数固定 * */ public void map(ImmutableBytesWritable row, Result values,Context context) throws IOException, InterruptedException { String count = new String(values.getValue(Bytes.toBytes("content"), Bytes.toBytes("count"))); word.set(count); context.write(word, one); } } /* 实现 Reduce 类 * map 的输出类型 * map 的输出值类型 * tablereduce 输出类型是null, * 输出值类型 put */ public static class Reduce extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; Iterator<IntWritable> iterator = values.iterator(); while (iterator.hasNext()) { sum += iterator.next().get(); } // Put 实例化,每个词存一行 Put put = new Put(Bytes.toBytes(("wordcount_"+key).toString())); // 列族为 content,列修饰符为 count,列值为数目 put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum))); context.write(null, put); } } // 创建 HBase 数据表 public static void createHBaseTable(String tableName) throws IOException { // 创建表描述 HTableDescriptor htd = new HTableDescriptor(tableName); // 创建列族描述 HColumnDescriptor col = new HColumnDescriptor("content"); htd.addFamily(col); // 配置 HBase Configuration conf = HBaseConfiguration.create(); //conf.set("hbase.zookeeper.quorum","127.0.0.1"); //conf.set("hbase.zookeeper.property.clientPort", "2181"); HBaseAdmin hAdmin = new HBaseAdmin(conf); if (hAdmin.tableExists(tableName)) { System.out.println("该数据表已经存在。"); // hAdmin.disableTable(tableName); // hAdmin.deleteTable(tableName); }else { System.out.println("创建表:" + tableName); hAdmin.createTable(htd); } } public static void main(String[] args) throws Exception { String sourceTable = "wordcount"; String targetTable = "wordcount_mapreduce"; createHBaseTable(targetTable); // 第二步:进行 MapReduce 处理 // 配置 MapReduce Configuration conf = new Configuration(); // 这几句话很关键 // conf.set("mapred.job.tracker", "master:9001"); //conf.set("hbase.zookeeper.quorum","master"); //conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = new Job(conf, "New Word Count"); job.setJarByClass(HBaseTableMapReduce.class); Scan scan = new Scan(); scan.setCaching(100); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs TableMapReduceUtil.initTableMapperJob( sourceTable, // input table scan, // Scan instance to control CF and attribute selection Map.class, // mapper class Text.class, IntWritable.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob( targetTable, // output table Reduce.class, // reducer class job); System.exit(job.waitForCompletion(true) ? 0 : 1); } }