HBase 结合 MapReduce

HBase 结合 MapReduce

为什么需要用 MapReduce 去访问 HBase 的数据?——加快分析速度和扩展分析能力MapReduce 访问 HBase 数据作分析一定是在离线分析的场景下应用
在这里插入图片描述
(一)HBaseToHDFS
从 HBase 中读取数据,分析之后然后写入 HDFS,代码实现:

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 作者: Lv_Hulk
* 时间: 2018/11/6 16:08
* 描述: 编写 mapreduce 程序从 hbase 读取数据,然后存储到 hdfs
*/
public class HBaseDataToHDFSMR {
 public static final String ZK_CONNECT = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
 public static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
 public static final String HDFS_CONNECT = "hdfs://myhadoop01/";
 public static final String HDFS_CONNECT_KEY = "fs.defaultFS";
 public static void main(String[] args) throws Exception {
 Configuration conf = HBaseConfiguration.create();
 conf.set(ZK_CONNECT_KEY, ZK_CONNECT);
 conf.set(HDFS_CONNECT_KEY, HDFS_CONNECT);
 System.setProperty("HADOOP_USER_NAME", "hadoop");
 Job job = Job.getInstance(conf);
 // 输入数据来源于 hbase 的 user_info 表
 Scan scan = new Scan();
 TableMapReduceUtil.initTableMapperJob("user_info", scan,
 HBaseDataToHDFSMRMapper.class, Text.class, NullWritable.class, job);
 // RecordReader --- TableRecordReader
 // InputFormat ----- TextInputFormat
 // 数据输出到 hdfs
 FileOutputFormat.setOutputPath(job, new Path("/hbase2hdfs/output2"));
 boolean waitForCompletion = job.waitForCompletion(true);
 System.exit(waitForCompletion ? 0 : 1);
 }
 /**
 * mapper 的输入 key-value 类型是:ImmutableBytesWritable, Result
 * mapper 的输出 key-value 类型就可以由用户自己制定
 */
 static class HBaseDataToHDFSMRMapper extends TableMapper<Text, NullWritable> {
 /**
 * keyType: LongWritable -- ImmutableBytesWritable:rowkey
 * ValueType: Text -- Result:hbase 表中某一个 rowkey 查询出来的所有的 key-value 对
 */
 @Override
 protected void map(ImmutableBytesWritable key, Result value, Context context) 
throws IOException, InterruptedException {
 // byte[] rowkey = Bytes.copy(key, 0, key.getLength());
 String rowkey = Bytes.toString(key.copyBytes());
 List<Cell> listCells = value.listCells();
 Text text = new Text();
 // 最后输出格式是: rowkye, base_info:name-huangbo, base-info:age-34
 for (Cell cell : listCells) {
 String family = new String(CellUtil.cloneFamily(cell));
 String qualifier = new String(CellUtil.cloneQualifier(cell));
 String v = new String(CellUtil.cloneValue(cell));
 long ts = cell.getTimestamp();
 text.set(rowkey + "\t" + family + "\t" + qualifier + "\t" + v + "\t" + ts);
 context.write(text, NullWritable.get());
 }
 }
 }
}

(二)HDFSToHBase
从 HDFS 从读入数据,处理之后写入 HBase,代码实现:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 作者: Lv_Hulk
* 时间: 2018/11/6 17:03
* 需求:读取 HDFS 上的数据。插入到 HBase 库中
* 
* 程序运行之前,要先做两件事:
* 1、把 student.txt 文件放入:/bigdata/student/input/目录中
* 2、创建好一张 hbase 表:
* create "student", "info"
*/
public class HDFSDataToHBaseMR extends Configured implements Tool{
 public static void main(String[] args) throws Exception {
 int run = ToolRunner.run(new HDFSDataToHBaseMR(), args);
 System.exit(run);
 }
 @Override
 public int run(String[] arg0) throws Exception {
 Configuration config = HBaseConfiguration.create();
 config.set("hbase.zookeeper.quorum", 
"hadoop01:2181,hadoop02:2181,hadoop03:2181");
 System.setProperty("HADOOP_USER_NAME", "hadoop");
 Job job = Job.getInstance(config, "HDFSDataToHBaseMR");
 
 job.setJarByClass(HDFSDataToHBaseMR.class);
 
 job.setMapperClass(HBaseMR_Mapper.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(NullWritable.class);
 
 // 设置数据的输出组件
 TableMapReduceUtil.initTableReducerJob("student", HBaseMR_Reducer.class, job);
 
 job.setOutputKeyClass(NullWritable.class);
 job.setOutputValueClass(Put.class);
 
 FileInputFormat.addInputPath(job, new Path("/bigdata/student/input"));
 
 boolean isDone = job.waitForCompletion(true);
 return isDone ? 0: 1;
 }
 public static class HBaseMR_Mapper extends Mapper<LongWritable, Text, Text, 
NullWritable>{
 
 @Override
 protected void map(LongWritable key, Text value, Context context) throws 
IOException, InterruptedException {
 context.write(value, NullWritable.get());
 }
 }
 
 public static class HBaseMR_Reducer extends TableReducer<Text, NullWritable, 
NullWritable>{
 
 @Override
 protected void reduce(Text key, Iterable<NullWritable> values, Context context) 
throws IOException, InterruptedException {
 String[] split = key.toString().split(",");
 Put put = new Put(split[0].getBytes());
 put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes());
 put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes());
 put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes());
 put.addColumn("info".getBytes(), "department".getBytes(), 
split[4].getBytes());
 
 context.write(NullWritable.get(), put);
 }
 }
}

猜你喜欢

转载自blog.csdn.net/lv_hulk/article/details/87106010