Java多线程方式写入Hbase,速度比单线程快很多。
代码:
package com.test.transform;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.pansafe.hbase.HbaseUtil;
public class JavaToHbase {
private static final Logger logger = LoggerFactory.getLogger(JavaToHbase.class);
public static Configuration configuration = null;
public static Connection conn = null;
static {
configuration = HBaseConfiguration.create();// 获得配制文件对象
configuration.set("hbase.zookeeper.quorum", "IP地址");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("zookeeper.znode.parent", "/hbase");
ExecutorService threads = Executors.newFixedThreadPool(8);// 线程池,8线程插入
try {
conn = ConnectionFactory.createConnection(configuration, threads);
} catch (IOException e1) {
e1.printStackTrace();
}
}
/**
* 批量插入数据
*
* @return
* @throws ParseException
* @tableName 表名字
* @info 需要写入的信息
*/
public static void putList(String tableName, String info) throws IOException, ParseException {
try {
BufferedMutator table = conn.getBufferedMutator(TableName.valueOf(tableName));
try {
String[] lines = info.split("\t");
// lines[0] 银行卡号 ,lines[1] 交易时间戳
// String rowkeys = getRowkeyByUUId(lines[0], lines[1]);
String rowkeys = lines[0];
Put put = new Put(Bytes.toBytes(rowkeys));
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("CRAD_NUMBER"), Bytes.toBytes(lines[0]));// 银行卡号
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("TRADING_TIME"), Bytes.toBytes(lines[1]));// 交易时间戳
List<Mutation> mutations = new ArrayList<Mutation>();
mutations.add(put);
table.mutate(mutations);
} finally {
if (table != null)
table.close();
}
} finally {
conn.close();
}
}
/**
* 生成随机码
*
*/
public static String getRowkeyByUUId(String CRAD_NUMBER, String TRADING_TIME_STAMP) throws ParseException {
String[] chars = new String[] { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p",
"q", "r", "s", "t", "u", "v", "w", "x", "y", "z", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A",
"B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V",
"W", "X", "Y", "Z" };
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 19 - CRAD_NUMBER.length(); i++) {
sb.append("0");
}
sb.append(CRAD_NUMBER);
CRAD_NUMBER = sb.toString();
int machineId = new Random().nextInt(9) % (9 - 1 + 1) + 1;
StringBuffer shortBuffer = new StringBuffer();
String uuid = UUID.randomUUID().toString().replace("-", "");
for (int i = 0; i < 8; i++) {
String str = uuid.substring(i * 3, i * 3 + 4);
int x = Integer.parseInt(str, 16);
shortBuffer.append(chars[x % 0x3E]);
}
return machineId + shortBuffer.toString() + CRAD_NUMBER + TRADING_TIME_STAMP;
}
public static void main(String[] args) throws IOException, ParseException {
主函数调用
}
}