MapReduce实现将HDFS中的数据解析并写入到HBase表中

目录

一、准备工作

二、功能实现

1、创建Maven工程并导入依赖

2、编写自定义Mapper类

3、编写自定义Reducer类

4、编写自定义Runner类

5、运行代码并查看结果


在上一篇博客中实现了HBase表之间的数据迁移功能:

HBase表之间的数据迁移(使用MapReduce实现)

那么,本文继续结合小案例介绍如何使用MapReduce将HDFS中的数据写入到HBase表中。

功能实现:

一、准备工作

1)启动好Zookeeper、Hadoop、HBase服务;

[hadoop@weekend110 ~]$ jps
3904 HMaster
2978 DataNode
2722 QuorumPeerMain
9428 Jps
2868 NameNode
3449 NodeManager
3338 ResourceManager
4043 HRegionServer
8763 Main
3167 SecondaryNameNode

2)准备一份测试数据 person.txt 并上传至HDFS:

[hadoop@weekend110 datas]$ hdfs dfs -cat /test/person.txt 
20/07/23 02:40:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
001     Michael 133xxxx5400
002     Andy    134xxxx5170
003     Justin  135xxxx5102
004     Ace     136xxxx6103
005     Luo     137xxxx5104
006     Liu     138xxxx7200
007     Li      139xxxx5103
008     Long    147xxxx5196
009     Zhang   157xxxx8100
010     Lu      177xxxx6800

3)在HBase中新建一个测试用表 table777_mr(包含两个列族:"information"和"contact"):

hbase(main):007:0> describe 'table777_mr'
Table table777_mr is ENABLED                                                                                                                     
table777_mr                                                                                                                                      
COLUMN FAMILIES DESCRIPTION
                                                                                                                      
{NAME => 'contact', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TT
L => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}  
              
{NAME => 'information', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE'
, TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}            
2 row(s) in 0.0180 seconds

二、功能实现

1、创建Maven工程并导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.3.3</version>
        </dependency>

        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>

2、编写自定义Mapper类

package xsluo;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class ReadFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //从HDFS读取数据
        String line = value.toString();
        //读取出来的每行数据使用\t进行分割,存于String数组
        String[] words = line.split("\t");

        //根据数据中值的含义取值
        String rowKey = words[0];
        String name = words[1];
        String phone = words[2];

        //初始化rowKey
        ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
        //初始化Put对象
        Put put = new Put(Bytes.toBytes(rowKey));
        //参数分别:列族、列、值
        put.add(Bytes.toBytes("information"), Bytes.toBytes("name"), Bytes.toBytes(name));
        put.add(Bytes.toBytes("contact"), Bytes.toBytes("phone"), Bytes.toBytes(phone));

//        System.out.println(">>>>>>>>>>put: "+put.toString());
        context.write(rowKeyWritable, put);
    }
}

3、编写自定义Reducer类

package xsluo;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;

public class WriteToHBaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        //读出来的每一行数据写入到table777_mr表中
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}

4、编写自定义Runner类

package xsluo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;

public class HdfsToHBaseRunner extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        //集群配置↓
        configuration.set("hbase.zookeeper.quorum", "192.168.2.100,192.168.2.101,192.168.2.102");
        configuration.set("hbase.master", "192.168.2.100:60000");
        int status = ToolRunner.run(configuration, new HdfsToHBaseRunner(), args);
        System.exit(status);
    }

    public int run(String[] strings) throws Exception {
        //获取Configuration对象
        Configuration conf = this.getConf();

        //创建Job任务
        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(HdfsToHBaseRunner.class);
        Path inPath = new Path("hdfs://192.168.2.100:9000/test/person.txt");
        FileInputFormat.addInputPath(job, inPath);

        //设置Mapper
        job.setMapperClass(ReadFromHDFSMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //设置Reducer
        TableMapReduceUtil.initTableReducerJob("table777_mr", WriteToHBaseReducer.class, job);

        //设置Reduce数量,最少1个
        job.setNumReduceTasks(1);

        boolean isSuccess = job.waitForCompletion(true);
        if(!isSuccess){
            throw new IOException("Job running with error");
        }

        return isSuccess ? 0 : 1;
    }
}

5、运行代码并查看结果

运行主程序,执行完毕后,查看导入的结果:

猜你喜欢

转载自blog.csdn.net/weixin_43230682/article/details/107537675