1、配置hbase-site.xml指向hdfs
<configuration> <property> <name>hbase.rootdir</name> <value>hdfs://bigdata-senior01.home.com:9000/hbase</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>hdfs://bigdata-senior01.home.com:9000/hbase/zookeeper</value> </property> <property> <name>hbase.unsafe.stream.capability.enforce</name> <value>false</value> <description> Controls whether HBase will check for stream capabilities (hflush/hsync). Disable this if you intend to run on LocalFileSystem, denoted by a rootdir with the 'file://' scheme, but be mindful of the NOTE below. WARNING: Setting this to false blinds you to potential data loss and inconsistent system state in the event of process and/or node failures. If HBase is complaining of an inability to use hsync or hflush it's most likely not a false positive. </description> </property> </configuration>
2、依赖
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.0.4</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>2.0.4</version> </dependency>
3、mapper
//输入:文本方式,输出:字节作为键,hbase的Mutation作为输出值 public class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Mutation> { //计数器 public enum Counters { LINES } private byte[] family = null; private byte[] qualifier = null; /** * Called once at the beginning of the task. * * @param context */ @Override protected void setup(Context context) throws IOException, InterruptedException { //从配置文件中读取列族信息,这个信息是控制台方式写入,并通过cli获取 String column = context.getConfiguration().get("conf.column"); ColParser parser = new ColParser(); parser.parse(column); if(!parser.isValid()) throw new IOException("family or qualifier error"); family = parser.getFamily(); qualifier = parser.getQualifier(); } /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. * * @param key * @param value * @param context */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String line = value.toString(); //散列每行数据作为行键,根据需求调整 byte[] rowKey = DigestUtils.md5(line); Put put = new Put(rowKey); put.addColumn(this.family,this.qualifier,Bytes.toBytes(line)); context.write(new ImmutableBytesWritable(rowKey),put); context.getCounter(Counters.LINES).increment(1); }catch (Exception e){ e.printStackTrace(); } } class ColParser { private byte[] family; private byte[] qualifier; private boolean valid; public byte[] getFamily() { return family; } public byte[] getQualifier() { return qualifier; } public boolean isValid() { return valid; } public void parse(String value) { try { String[] sValue = value.split(":"); if (sValue == null || sValue.length < 2 || sValue[0].isEmpty() || sValue[1].isEmpty()) { valid = false; return; } family = Bytes.toBytes(sValue[0]); qualifier = Bytes.toBytes(sValue[1]); valid = true; } catch (Exception e) { valid = false; } } } }
4、main
public class ImportFromFile { // private static String HDFSUri = "hdfs://bigdata-senior01.home.com:9000"; public static final String NAME = "ImportFromFile"; private static CommandLine parseArgs(String[] args) throws ParseException{ Options options = new Options(); Option option = new Option("t","table",true,"表不能为空"); option.setArgName("table-name"); option.setRequired(true); options.addOption(option); option = new Option("c","column",true,"列族和列名不能为空"); option.setArgName("family:qualifier"); option.setRequired(true); options.addOption(option); option = new Option("i","input",true,"输入文件或者目录"); option.setArgName("path-in-HDFS"); option.setRequired(true); options.addOption(option); options.addOption("d","debug",false,"switch on DEBUG log level"); CommandLineParser parser = new PosixParser(); CommandLine cmd = null; try { cmd = parser.parse(options,args); }catch (Exception e){ System.err.println("ERROR: " + e.getMessage() + "\n"); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(NAME + " ", options, true); System.exit(-1); } if (cmd.hasOption("d")) { Logger log = Logger.getLogger("mapreduce"); log.setLevel(Level.DEBUG); } return cmd; } public static void main(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); String[] runArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); CommandLine cmd = parseArgs(runArgs); if (cmd.hasOption("d")) conf.set("conf.debug", "true"); String table = cmd.getOptionValue("t"); String input = cmd.getOptionValue("i"); String column = cmd.getOptionValue("c"); //写入配置后,在mapper阶段取出 conf.set("conf.column", column); Job job = Job.getInstance(conf,"Import from file " + input +" into table " + table); job.setJarByClass(ImportFromFile.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setNumReduceTasks(0); //不需要reduce FileInputFormat.addInputPath(job,new Path(input)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
5、执行
先在HBASE里建表 create 'importTable','data' 把jar包传到hdfs上执行 hadoop jar ImportFromFile.jar -t importTable -i /input/test-data.txt -c data:json