第一步:创建myuser2这张表
注意:列族的名字要与myuser表的列族名字相同
hbase(main):010:0> create ‘myuser2’,‘f1’
第二步:创建maven工程,导入jar包
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*/RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
第三步:开发MR的程序
1.Mapper
public class HBaseMapper extends TableMapper<Text, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
byte[] bytes = key.get();
String rowKey = Bytes.toString(bytes);
Put put = new Put(key.get());
Cell[] cells = value.rawCells();
for (Cell cell : cells) {
if ("f2".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
if ("address".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
put.add(cell);
}
// if ("sex".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
// put.add(cell);
// }
}
}
if (!put.isEmpty()){
context.write(new Text(rowKey),put);
}
}
}
2.reduce
public class HBaseReducer extends TableReducer<Text, Put, ImmutableBytesWritable>{
@Override
protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put value : values) {
context.write(null,value);
}
}
}
3、RmHbase
public class RmHbase extends Configured implements Tool {
//读取myuser这张表当中的数据写入到HBase的另外一张表当中去
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "hbaseMr11");
job.setJarByClass(RmHbase.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
////使用TableMapReduceUtil 工具类来初始化我们的mapper
//TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"),scan,HBaseMapper.class,Text.class,Put.class,job)
TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"),scan,HBaseMapper.class,Text.class,Put.class,job);
//使用TableMapReduceUtil 工具类来初始化我们的reducer
TableMapReduceUtil.initTableReducerJob("myuser3",HBaseReducer.class,job);
return job.waitForCompletion(false)?0:1;
}
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
int run = ToolRunner.run(configuration,new RmHbase(),args);
System.exit(run);
}
}
结果: