Java多线程方式写入Hbase

Java多线程方式写入Hbase,速度比单线程快很多。

代码:

package com.test.transform;

import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pansafe.hbase.HbaseUtil;

public class JavaToHbase {
    private static final Logger logger = LoggerFactory.getLogger(JavaToHbase.class);
    public static Configuration configuration = null;
    public static Connection conn = null;
    static {
        configuration = HBaseConfiguration.create();// 获得配制文件对象
        configuration.set("hbase.zookeeper.quorum", "IP地址");
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        configuration.set("zookeeper.znode.parent", "/hbase");
        ExecutorService threads = Executors.newFixedThreadPool(8);// 线程池,8线程插入

        try {
            conn = ConnectionFactory.createConnection(configuration, threads);
        } catch (IOException e1) {
            e1.printStackTrace();
        }
    }

    /**
     * 批量插入数据
     * 
     * @return
     * @throws ParseException
     * @tableName 表名字
     * @info 需要写入的信息
     */
    public static void putList(String tableName, String info) throws IOException, ParseException {

        try {
            BufferedMutator table = conn.getBufferedMutator(TableName.valueOf(tableName));
            try {
                String[] lines = info.split("\t");
                // lines[0] 银行卡号 ,lines[1] 交易时间戳
                // String rowkeys = getRowkeyByUUId(lines[0], lines[1]);
                String rowkeys = lines[0];
                Put put = new Put(Bytes.toBytes(rowkeys));
                put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("CRAD_NUMBER"), Bytes.toBytes(lines[0]));// 银行卡号
                put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("TRADING_TIME"), Bytes.toBytes(lines[1]));// 交易时间戳
                List<Mutation> mutations = new ArrayList<Mutation>();
                mutations.add(put);
                table.mutate(mutations);
            } finally {
                if (table != null)
                    table.close();
            }
        } finally {
            conn.close();
        }
    }

    /**
     * 生成随机码
     * 
     */
    public static String getRowkeyByUUId(String CRAD_NUMBER, String TRADING_TIME_STAMP) throws ParseException {

        String[] chars = new String[] { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p",
                "q", "r", "s", "t", "u", "v", "w", "x", "y", "z", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A",
                "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V",
                "W", "X", "Y", "Z" };

        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < 19 - CRAD_NUMBER.length(); i++) {
            sb.append("0");
        }
        sb.append(CRAD_NUMBER);
        CRAD_NUMBER = sb.toString();
        int machineId = new Random().nextInt(9) % (9 - 1 + 1) + 1;
        StringBuffer shortBuffer = new StringBuffer();
        String uuid = UUID.randomUUID().toString().replace("-", "");
        for (int i = 0; i < 8; i++) {
            String str = uuid.substring(i * 3, i * 3 + 4);
            int x = Integer.parseInt(str, 16);
            shortBuffer.append(chars[x % 0x3E]);
        }
        return machineId + shortBuffer.toString() + CRAD_NUMBER + TRADING_TIME_STAMP;
    }

    public static void main(String[] args) throws IOException, ParseException {
       主函数调用
    }
}

猜你喜欢

转载自blog.csdn.net/qq_28719873/article/details/81112314