版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/HG_Harvey/article/details/83241267
导读:
业务需求,需要将Hive中数据导入到HBase,笔者先用Java API 的方式开发了一版,但是在测试过后,效果不是很理想,如果数据量不大的情况下,可以使用,如果数据量成百上千万,甚至上亿,数据导入效率太低,时间太长,所以后面又使用MapReduce 重新编写了。废话不多说,小二上菜了。
1.项目构建
1.1 项目结构
项目使用Maven构建,项目结构如下
bigdata-data-sync-hive2hbase
src
main
filters ---------------------------------- 开发环境配置
dev.properties
online.properties
test.properties
java
com
bigdata
harvey
config -------------------- 用于读取properties文件,封装Map
constants ----------------- 公共包
sync ---------------------- 运行程序
utils --------------------- 工具包
resource
develop.properties
pom.xml ------------------------------------------- maven 配置
1.2 pom.xml 文件配置
1.2.1 项目依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.6</version>
</dependency>
</dependencies>
1.2.2 profile 配置
配置profile后,在idea的左侧Maven Projects 中的Profiles中可以选择开发环境,不用再修改代码,灵活方便
<profiles>
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<filters>
<filter>src/main/filters/dev.properties</filter>
</filters>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
</profile>
<profile>
<id>online</id>
<build>
<filters>
<filter>src/main/filters/online.properties</filter>
</filters>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
</profile>
<profile>
<id>test</id>
<build>
<filters>
<filter>src/main/filters/test.properties</filter>
</filters>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
</profile>
</profiles>
1.2.3 打包插件
<build>
<finalName>bigdata-data-sync-hive2hbase</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<mainClass>com.bigdata.harvey.Hive2HBaseMR</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.bigdata.harvey.Hive2HBaseMR</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1.2.4 properties.xml 配置
开发环境配置分为开发、测试环境、线上环境,使用properties文件,文件中key均相同,不同的是value,以dev.properties为例,文件内容如下
# zookeeper 端口
maven.hbase.zookeeper.property.clientPort=port
# zookeeper 地址,多个ip逗号分隔
maven.hbase.zookeeper.quorum=ip1,ip2,ip3
# hbase 地址
maven.hbase.master=ip:port
# hmaster启动时候会将hbase系统表-ROOT- 加载到 zookeeper cluster,通过zookeeper cluster可以获取当前系统表.META.的存储所对应的regionserver信息
# 所以需要配置hbase的-ROOT-的regionserver的位置信息存储路径,如果不配置则启动时会报错连接不上zookeeper
# 如果不知道路径是什么,可以去hbase-site.xml文件中查找,默认应该是/hbase(待考证)
maven.zookeeper.znode.parent=/hbase-unsecure
# hive url
maven.hive.url=jdbc:hive2://ip:port/
# hive 驱动
maven.hive.driver.class=org.apache.hive.jdbc.HiveDriver
# hive 用户名
maven.hive.user=hive
# hive 密码
maven.hive.password=
# hive 表对应 hdfs 根路径
maven.hive.data.input.path=hdfs://ip:port/user/hive/
# 程序运行异常,短信告警,多个手机号逗号分隔即可
maven.warning.sms.mobile.list=phone
2.代码编写
2.1 工具类
- DateUtils.java
public class DateUtils {
public static String getDate(int num) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
calendar.add(calendar.DATE, num);
return sdf.format(calendar.getTime());
}
public static void main(String[] args) {
System.out.println(getDate(-1));
}
}
- HiveJDBCUtils.java
import com.sqyc.bigdata.config.ConfigureContext;
import com.sqyc.bigdata.constants.Constants;
import com.sqyc.bigdata.constants.PropertiesKeyConsts;
import com.sqyc.bigdata.solar.kuiper.exception.SolarException;
import java.io.IOException;
import java.sql.*;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class HiveJDBCUtils {
private static Connection conn = null;
private static Connection getConnection(String url, String user, String password, String hiveDBName) {
try {
Class.forName(Constants.DRIVER_CLASS_NANE);
conn = DriverManager.getConnection(url + hiveDBName, user, password);
} catch (Exception e) {
throw new SolarException(e);
}
return conn;
}
/**
* 获取hive表所有列名
* @param hiveDBName hive 库名
* @param warehouseTableName hive 表名
* @return List<String> 列名list
*/
public static List<String> loadTableColumn(String url, String user, String password, String hiveDBName, String warehouseTableName, String ignoreFields) {
List<String> columnList = new ArrayList<String>();
conn = getConnection(url, user, password, hiveDBName);
try {
PreparedStatement ps = conn.prepareStatement(Constants.DESC_TABLE + warehouseTableName);
ResultSet rs = ps.executeQuery();
while(rs.next()) {
if (startColumnsDescRow(rs)) {
while(rs.next()){
String colName = rs.getString(Constants.COL_NAME);
List<String> ignoreList = Arrays.asList(ignoreFields.split(","));
if (colName == null || colName.trim().equals("") || ignoreList.contains(colName)) {
continue;
} else if(colName.startsWith(Constants.SHARP)) {
break;
} else {
columnList.add(colName);
}
}
}
}
} catch (Exception e) {
throw new SolarException(e);
} finally {
close();
}
return columnList;
}
private static void close() {
if(conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
private static boolean startColumnsDescRow(ResultSet rs) throws SQLException {
String colName = rs.getString(Constants.COL_NAME);
return colName.trim().equals("# col_name");
}
public static void main(String[] args) {
ConfigureContext configureContext = ConfigureContext.getInstance();
Map<String, String> propMap = configureContext.getPropMap();
String url = propMap.get(PropertiesKeyConsts.HIVE_URL);
String user = propMap.get(PropertiesKeyConsts.HIVE_USER);
String password = propMap.get(PropertiesKeyConsts.HIVE_PASSWORD);
String hiveDBName = "dbName";
String warehouseTableName = "tableName";
String ignoreFields = "dt";
List<String> columnList = HiveJDBCUtils.loadTableColumn(url, user, password, hiveDBName, warehouseTableName, ignoreFields);
for (String column: columnList) {
System.out.println(column);
}
}
}
- MD5Utils.java
import java.security.MessageDigest;
public class MD5Utils {
/***
* MD5加码 生成32位md5码
*/
public static String string2MD5(String inStr){
MessageDigest md5 = null;
try{
md5 = MessageDigest.getInstance("MD5");
}catch (Exception e){
System.out.println(e.toString());
e.printStackTrace();
return "";
}
char[] charArray = inStr.toCharArray();
byte[] byteArray = new byte[charArray.length];
for (int i = 0; i < charArray.length; i++)
byteArray[i] = (byte) charArray[i];
byte[] md5Bytes = md5.digest(byteArray);
StringBuffer hexValue = new StringBuffer();
for (int i = 0; i < md5Bytes.length; i++){
int val = ((int) md5Bytes[i]) & 0xff;
if (val < 16)
hexValue.append("0");
hexValue.append(Integer.toHexString(val));
}
return hexValue.toString();
}
public static void main(String args[]) {
String str = "20497";
System.out.println(string2MD5(str));
}
}
2.2 公共类
- Constants.java
public class Constants {
public static final String DRIVER_CLASS_NANE = "org.apache.hive.jdbc.HiveDriver";
public static final String DESC_TABLE = "DESC FORMATTED ";
public static final String COL_NAME = "col_name";
public static final String SHARP = "#";
public static final String PARTITION_PREFIX = "dt=";
public static final String HBASE_COLUMN_FAMILY = "info";
public static final char FIELDS_TERMINATED = '\001';
// ------------------------------------------------------ constant ------------------------------------------------------
public static final String COMM_SLASH = "/";
public static final String DB = "db";
public static final String COMM_POINT = ".";
public static final String COMMA = ",";
}
- PropertiesKeyConsts.java
public class PropertiesKeyConsts {
// ----------------------------------------------------- app.properties ------------------------------------------------------
public static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
public static final String HBASE_MASTER = "hbase.master";
public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
public static final String HIVE_URL = "hive.url";
public static final String HIVE_DRIVER_CLASS = "hive.driver.class";
public static final String HIVE_USER = "hive.user";
public static final String HIVE_PASSWORD = "hive.password";
public static final String HIVE_DATA_INPUT_PATH = "hive.data.input.path";
public static final String SMS_MOBILE_LIST = "warning.sms.mobile.list";
}
2.3 加载配置文件
- ConfigureContext.java
public class ConfigureContext {
private Map<String, String> propMap = null;
private static ConfigureContext instance = new ConfigureContext();
private ConfigureContext() {
this.initConfig();
}
public static ConfigureContext getInstance() {
return instance;
}
private void initConfig() {
propMap = new HashMap<String, String>();
ResourceBundle bundle = ResourceBundle.getBundle("develop");
propMap.put(PropertiesKeyConsts.ZOOKEEPER_CLIENT_PORT, bundle.getString(PropertiesKeyConsts.ZOOKEEPER_CLIENT_PORT));
propMap.put(PropertiesKeyConsts.ZOOKEEPER_QUORUM, bundle.getString(PropertiesKeyConsts.ZOOKEEPER_QUORUM));
propMap.put(PropertiesKeyConsts.HBASE_MASTER, bundle.getString(PropertiesKeyConsts.HBASE_MASTER));
propMap.put(PropertiesKeyConsts.ZOOKEEPER_ZNODE_PARENT, bundle.getString(PropertiesKeyConsts.ZOOKEEPER_ZNODE_PARENT));
propMap.put(PropertiesKeyConsts.HIVE_URL, bundle.getString(PropertiesKeyConsts.HIVE_URL));
propMap.put(PropertiesKeyConsts.HIVE_DRIVER_CLASS, bundle.getString(PropertiesKeyConsts.HIVE_DRIVER_CLASS));
propMap.put(PropertiesKeyConsts.HIVE_USER, bundle.getString(PropertiesKeyConsts.HIVE_USER));
propMap.put(PropertiesKeyConsts.HIVE_PASSWORD, bundle.getString(PropertiesKeyConsts.HIVE_PASSWORD));
propMap.put(PropertiesKeyConsts.HIVE_DATA_INPUT_PATH, bundle.getString(PropertiesKeyConsts.HIVE_DATA_INPUT_PATH));
propMap.put(PropertiesKeyConsts.SMS_MOBILE_LIST, bundle.getString(PropertiesKeyConsts.SMS_MOBILE_LIST));
}
public Map<String, String> getPropMap() {
return propMap;
}
}
2.4 MapReduce
import com.sqyc.bigdata.config.ConfigureContext;
import com.sqyc.bigdata.constants.Constants;
import com.sqyc.bigdata.constants.PropertiesKeyConsts;
import com.sqyc.bigdata.solar.kuiper.util.SMSMessageSender;
import com.sqyc.bigdata.utils.DateUtils;
import com.sqyc.bigdata.utils.HiveJDBCUtils;
import com.sqyc.bigdata.utils.MD5Utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class Hive2HBaseMR {
public static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private List<String> columnList;
private int errLineNo;
private String errMsg;
private String hiveDBName;
private String hiveTableName;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.errLineNo = 0;
this.errMsg = null;
Configuration cfg = context.getConfiguration();
this.hiveDBName = cfg.get("hiveDBName");
this.hiveTableName = cfg.get("hiveTableName");
columnList = HiveJDBCUtils.loadTableColumn(cfg.get("url"), cfg.get("user"), cfg.get("password"), hiveDBName, hiveTableName, cfg.get("ignoreFields"));
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
List<String> valueList = this.split(value.toString(), Constants.FIELDS_TERMINATED);
byte[] rowkey = MD5Utils.string2MD5(valueList.get(0)).getBytes();
byte[] columnFamily = Constants.HBASE_COLUMN_FAMILY.getBytes();
Put put = new Put(rowkey);
if (!columnList.isEmpty()) {
for (int i = 0; i < columnList.size(); i++) {
put.addColumn(columnFamily, columnList.get(i).getBytes(), valueList.get(i).getBytes());
}
}
context.write(new ImmutableBytesWritable(rowkey), put);
} catch (Exception e) {
e.printStackTrace();
this.errLineNo++;
errMsg = "Hive同步数据到HBase失败,请检查!【hiveDBName = " + hiveDBName + ",hiveTableName=" + hiveTableName + "】";
throw new RuntimeException(errMsg);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if (this.errLineNo > 0) {
throw new RuntimeException(errMsg);
}
}
/**
* 对value进行处理,例如:a\t\tb\t\t\t,最后拿到value后,后面3个\t默认会当做一个\t,当数据导入到hbase后,就会少列,处理后,后面就是3列,不会少列
* @param str value,一行数据
* @param spliter 分隔符,对应hive表中字段之间的分隔符
*/
private List<String> split(String str, char spliter){
List<String> res = new ArrayList<String>();
int i = 0;
if(str.startsWith(String.valueOf(spliter))){
res.add("");
}
StringBuilder buffer = new StringBuilder();
while(i < str.length()){
if(str.charAt(i) == spliter){
if(i != 0){
res.add(buffer.toString());
}
buffer.delete(0, buffer.length());
}else{
buffer.append(str.charAt(i));
}
i++;
}
if(buffer.length() > 0){
res.add(buffer.toString());
}
if(str.endsWith(String.valueOf(spliter))){
res.add("");
}
return res;
}
}
/**
* 获取hive数据对应HDFS路径
* @param dataInputPath hive数据存放根路径(如:hdfs://ip:port/user/hive/)
* @param hiveDBName 库名
* @param hiveTableName 表名
* @param partitionDate 分区日期
* @return String hive数据对应HDFS全路径
*/
private static String getHDFSPath(String dataInputPath, String hiveDBName, String hiveTableName, String partitionDate) {
StringBuffer buffer = new StringBuffer();
buffer.append(dataInputPath);
buffer.append(hiveDBName);
buffer.append(Constants.COMM_POINT);
buffer.append(Constants.DB);
buffer.append(Constants.COMM_SLASH);
buffer.append(hiveTableName);
buffer.append(Constants.COMM_SLASH);
buffer.append(Constants.PARTITION_PREFIX);
if (StringUtils.isEmpty(partitionDate)) {
partitionDate = DateUtils.getDate(-1); // 默认使用昨天分区数据
}
buffer.append(partitionDate);
return buffer.toString();
}
/**
* hbase 创建表
* @param conf Hbase Configuration 对象
* @param hbaseTableName 表名
*/
private static void createTable(Configuration conf, String hbaseTableName) throws Exception {
Connection conn = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
TableName tableName = TableName.valueOf(hbaseTableName);
if (!admin.tableExists(tableName)) { // 表不存在则创建
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor columnDesc = new HColumnDescriptor(Constants.HBASE_COLUMN_FAMILY);
desc.addFamily(columnDesc);
admin.createTable(desc);
}
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: hadoop jar xxx.jar hiveDBName hiveTableName ignoreFields [partitionDate] ^_^");
System.exit(-1);
}
String hiveDBName = args[0];
String hiveTableName = args[1];
String ignoreFields = args[2];
String partitionDate = null;
if (args.length == 4) {
partitionDate = args[3];
}
ConfigureContext configureContext = ConfigureContext.getInstance();
Map<String, String> propMap = configureContext.getPropMap();
String url = propMap.get(PropertiesKeyConsts.HIVE_URL);
String user = propMap.get(PropertiesKeyConsts.HIVE_USER);
String password = propMap.get(PropertiesKeyConsts.HIVE_PASSWORD);
String dataInputPath = getHDFSPath(propMap.get(PropertiesKeyConsts.HIVE_DATA_INPUT_PATH), hiveDBName, hiveTableName, partitionDate);
String hbaseZKClientPort = propMap.get(PropertiesKeyConsts.ZOOKEEPER_CLIENT_PORT);
String hbaseZKQuorum = propMap.get(PropertiesKeyConsts.ZOOKEEPER_QUORUM);
String hmaster = propMap.get(PropertiesKeyConsts.HBASE_MASTER);
String zkZnodeParent = propMap.get(PropertiesKeyConsts.ZOOKEEPER_ZNODE_PARENT);
String mobiles = propMap.get(PropertiesKeyConsts.SMS_MOBILE_LIST);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort", hbaseZKClientPort);
conf.set("hbase.zookeeper.quorum", hbaseZKQuorum);
conf.set("hbase.master", hmaster);
conf.set("zookeeper.znode.parent", zkZnodeParent);
// 传递参数
conf.set("hiveDBName", hiveDBName);
conf.set("hiveTableName", hiveTableName);
conf.set("ignoreFields", ignoreFields);
conf.set("url", url);
conf.set("user", user);
conf.set("password", password);
// hbase 建表
createTable(conf, hiveTableName);
Job job = Job.getInstance(conf, "hive2hbase-" + hiveTableName);
job.setJarByClass(Hive2HBaseMR.class);
job.setMapperClass(ImportMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
FileInputFormat.addInputPath(job, new Path(dataInputPath));
TableMapReduceUtil.initTableReducerJob(
hiveTableName, // hbase tableName
null, // reducer class
job);
boolean flag = job.waitForCompletion(true);
if (!flag) {
String errMsg = "Hive同步数据到HBase失败,请检查!【hiveDBName = " + hiveDBName + ",hiveTableName=" + hiveTableName + "】";
// 运行失败发送短信告警,短信内容errMsg
System.exit(-1);
} else {
System.exit(0);
}
}
}
3.运行
将上述代码打成jar,运行如下命令即可
hadoop jar jarName hiveDBName hiveTableName ignoreFields [partitionDate]