import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ReadHdfsToHbase {
public static void main(String[] args) {
Configuration conf=new Configuration();
conf.set(“fs.defaultFS”, “hdfs://bd1803”);
conf.set(“hbase.zookeeper.quorum”, “centos01:2181,centos02:2181,centos03:2181”);
System.setProperty(“HADOOP_USER_NAME”, “centos”);
try {
Job job=Job.getInstance(conf);
job.setJarByClass(ReadHdfsToHbase.class);
job.setMapperClass(MyMapper.class);
TableMapReduceUtil.initTableReducerJob("mingxing", MyReducer.class, job,
null, null, null, null, false);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Mutation.class);
FileInputFormat.addInputPath(job, new Path("/mingxingin"));
job.waitForCompletion(true);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
static class MyReducer extends TableReducer<Text, NullWritable, NullWritable>{
@Override
protected void reduce(Text key, Iterable<NullWritable> values,
Reducer<Text, NullWritable, NullWritable, Mutation>.Context context)
throws IOException, InterruptedException {
String line = key.toString();
String[] split = line.split(",");
//zhangfenglun,M,20,13522334455,[email protected],23521472
Put p=new Put(split[0].getBytes());
p.add("basicinfo".getBytes(), "sex".getBytes(), split[1].getBytes());
p.add("basicinfo".getBytes(), "age".getBytes(), split[2].getBytes());
p.add("extrainfo".getBytes(), "phone".getBytes(), split[3].getBytes());
p.add("extrainfo".getBytes(), "email".getBytes(), split[4].getBytes());
p.add("extrainfo".getBytes(), "qq".getBytes(), split[5].getBytes());
context.write(NullWritable.get(), p);
}
}
}