为了使用MapReduce任务来并行处理大规模HBase数据,HBase对MapReduce API进行了扩展:
常用的HBase MapReduce API与Hadoop MapReduce API对应关系如下表
HBase MapReduce API | Hadoop MapReduce API |
org.apache.hadoop.hbase.mapreduce.TableMapper | org.apache.hadoop.mapreduce.Mapper |
org.apache.hadoop.hbase.mapreduce.TableReducer | org.apache.hadoop.mapreduce.Reducer |
org.apache.hadoop.hbase.mapreduce.TableInputFormat | org.apache.hadoop.mapreduce.InputFormat |
org.apache.hadoop.hbase.mapreduce.TableOutputFormat | org.apache.hadoop.mapreduce.OutputFormat |
HBase不同表间数据转移
HBase使用MapReduce任务将一张表中的部分数据转移到另外一张表中,并且保留源表的数据。
已知HBase中已经存在一张表"student",且其中有三条数据,如下表
rowkey(行键) | info(列族) |
001 | info:name="zhangsan" |
info:age=21 | |
info:address="beijin" | |
002 | info:name="lisi" |
info:age=19 | |
info:address="shanghai" | |
003 | info:name="wangwu" |
info:age=18 | |
info:address="shandong" |
现在需要将表"student"中的"info:name"列和"info:age"列的数据转移到另外一张表"student_new"中,这两张表结构相同,且表"student_new"需要提前在HBase中创建好.
创建命令如下:
create 'student_new','info'
创建Java Maven工程
引入API依赖的jar包
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.4.11</version> </dependency> <!-- HBase MapReducer API--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.4.11</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.4.11</version> </dependency> <!-- 指定JDK工具包的位置,需要本地配置好环境变量JAVA_HOME--> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency>
写Hadoop的MapReduce程序步骤:
经典:Mapper接口实现
package org.jy.data.bigdata.hbase.hadoop.mapreduce; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; /** * 从HBase中读取表student的指定列的数据 * 这些列都是关于HBase MapReducer api hbase-server.jar中的类 */ public class ReadStudentMapper extends TableMapper<ImmutableBytesWritable,Put> { /** * * @param key * @param value * @param context * @throws IOException * @throws InterruptedException * 参数key为表中的行键,value为表中的一行数据 */ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // 新建Put对象,传入行键 Put put = new Put(key.get());// 组合输入的数据 // 遍历一行数据的每一个单元格 for (Cell cell : value.rawCells()){ // 如果当前单元格所属列族为Info if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ // 判断列族是否相等 // 如果当前单元格的列限定符为name if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ // 将单元格加入到put对象中 put.add(cell); // 如果当前单元格的列限定符为age }else if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ // 将单元格加入到put对象中 put.add(cell); } } } // 将put对象写入到context中作为map的输出 context.write(key,put); // map的输出 } } 经典Reduce接口实现 package org.jy.data.bigdata.hbase.hadoop.mapreduce; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import java.io.IOException; /** * 该类接收map阶段读取的数据,并将数据写入到HBase中. * 将数据写入到HBase另外一张表student_new中 */ public class WriterStudentReducer extends TableReducer<ImmutableBytesWritable,Put,NullWritable> { /** * 接收map()方法的输出,参数key和values的类型需与map()方法的输出一致 * @param key * @param values * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for(Put put : values){ // 将数据写入HBase表中,输出的key可以为空,因为行键在put对象中已经包含 context.write(NullWritable.get(),put); } } }
package org.jy.data.bigdata.hbase.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; /** * 该类用于构建与执行MapReducer任务 */ public class StudentMRRunner { /** * main方法,任务执行的入口 * @param args */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 创建Configuration 实例 Configuration configuration = HBaseConfiguration.create(); // 创建Job任务,指定任务名称 Job job = Job.getInstance(configuration,"hbase_mr_job"); // 设置任务运行的主类 job.setJarByClass(StudentMRRunner.class); // 创建Scan 扫描对象 Scan scan = new Scan(); // 是否缓存块数据,默认为true 。设置为false节省了交换缓存的操作消耗,可以提升mr任务的效率。 // MR任务必须设置为false scan.setCacheBlocks(false); // 每次RPC请求从HBase表中取得的数据行数 scan.setCaching(500); // 初始化Mapper任务 // 注意导入的mapreduce包,而不是mapred包,后者是旧版本 TableMapReduceUtil.initTableMapperJob("student",// 数据源表名称 scan, // 扫描控制器 ReadStudentMapper.class, // 指定Mapper类 ImmutableBytesWritable.class ,// Mapper输出的key类型 Put.class,// Mapper输出的value类型 job // 指定任务job ); // 初始化Reduce任务 TableMapReduceUtil.initTableReducerJob("student_new", // 数据目的地表名 WriterStudentReducer.class,job);// 指定reducer类与任务Job // 设置Reducer数量,最少1个 job.setNumReduceTasks(1); // 执行任务 boolean isSuccess = job.waitForCompletion(true); if(!isSuccess){ throw new IOException("任务运行错误!!"); } System.exit(isSuccess ? 0 : 1); } }
启动YARN集群
MapReduce任务运行于YARN集群之上,因此需要先启动YARN集群.
运行MapReduce任务
将maven打出的jar发布到Hadoop集群的主节点中,执行如下命令:
hadoop jar 打出的jar包名.jar org.jy.data.bigdata.hbase.hadoop.mapreduce.StudentMRRunner