在windows下使用编译器写代码时,要注意jdk版本和linux下的jdk版本保持一致,避免版本不同造成在linux环境下不能运行的情况。这里使用的是mapreduce框架来做数据的分析。
如果创建的是Maven项目,需要在pom.xml文件中导入相关依赖(小可不太喜欢Maven,所以直接将hbase的lib包中的jar包导入项目,同时避免版本不同造成的不必要的麻烦)。
这里有一些可以用来练手的数据:https://download.csdn.net/download/weixin_43562234/11022425
相关依赖:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
1.Mapper
package test1_HDFS2;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ReadHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
//从HDFS文件中读取数据
String lineValue=value.toString();
//将读出的数据每行用“,”分割,存进String[]数组中
String[] values= lineValue.split(",");
//按含义取出String[]中的值
String one=values[0];
String two=values[1];
String three=values[2];
String four=values[3];
//表的部分数据的第五列没有数据,为不丢失数据,可以将没有数据的位置用一个特殊字符来代替
String five;
if (values.length<5){
five="null";
}else{
five=values[4];
}
//初始化rowKey
//同一用户在不同时间会产生不同的数据,如果只用用户id来做Key会造成数据的丢失,建议用时间戳和用户id生成一个key
long timetamp = System.currentTimeMillis();
String rowKey=one+"_"+timetamp;
ImmutableBytesWritable rowKeyWriteable=new ImmutableBytesWritable(Bytes.toBytes(rowKey));
//初始化put对象
Put put=new Put(Bytes.toBytes(rowKey));
//参数:列族、列、值
put.add(Bytes.toBytes("info"), Bytes.toBytes("one"), Bytes.toBytes(one));
put.add(Bytes.toBytes("info"), Bytes.toBytes("two"), Bytes.toBytes(two));
put.add(Bytes.toBytes("info"), Bytes.toBytes("three"), Bytes.toBytes(three));
put.add(Bytes.toBytes("info"), Bytes.toBytes("four"), Bytes.toBytes(four));
put.add(Bytes.toBytes("info"), Bytes.toBytes("five"), Bytes.toBytes(five));
context.write(rowKeyWriteable,put);
}
}
2.Reducer
package test1_HDFS2;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class WriteReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key,Iterable<Put> values,Context context) throws IOException, InterruptedException {
//将读出来的数据写入data表中
for(Put put:values){
context.write(NullWritable.get(),put);
}
}
}
3.Driver
package test1_HDFS2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import test1_HDFS.Txt2TestRunner;
import test1_HDFS.WriteTestMRFromTxtReducer;
import java.io.IOException;
public class Driver extends Configured implements Tool {
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取配置
//hbase依赖zookeeper
Configuration configuration;
configuration= HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","master,slave3,slave4");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.master", "master:60000");
//创建job
Job job=Job.getInstance(configuration,this.getClass().getSimpleName());
job.setJarByClass(Driver.class);
Path inPath=new Path("hdfs://master:9000/user/hadoop/test/data.csv");
FileInputFormat.addInputPath(job,inPath);
//设置Mapper
job.setMapperClass(ReadHDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//设置 Reducer
TableMapReduceUtil.initTableReducerJob("data", WriteTestMRFromTxtReducer.class, job);
//设置 Reduce 数量,最少 1 个
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true); if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Txt2TestRunner(), args);
System.exit(status);
}
}
如果有错误,还请大佬批评指正,不胜感激!