引入相关依赖
<artifactId>hbase</artifactId>
java代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public class HBaseUtil {
private static Logger logger = LoggerFactory.getLogger(HBaseUtil.class);
private Configuration config;
private Connection connection;
public HBaseUtil() {
}
public HBaseUtil(Configuration config) {
this.config = config;
}
public Configuration getHbaseConfig() {
if (config == null) {
config = HBaseConfiguration.create();
}
return config;
}
public Connection getConnection() throws IOException {
if (connection == null) {
connection = ConnectionFactory.createConnection(getHbaseConfig());
}
return connection;
}
public Table getTable(String tableName) throws IOException {
if (!isTableExists(tableName)) {
throw new IllegalStateException("Table [" + tableName + "] is not exist");
}
return getConnection().getTable(TableName.valueOf(tableName));
}
public boolean isTableExists(String tableName) throws IOException {
return getConnection().getAdmin().tableExists(TableName.valueOf(tableName));
}
public void createTable(String tableName, String[] columnFamilys) throws IOException {
HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin();
if (admin.tableExists(TableName.valueOf(tableName))) {
throw new IllegalStateException("Table [" + tableName + "] is exist");
} else {
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String columnFamily : columnFamilys) {
descriptor.addFamily(new HColumnDescriptor(columnFamily));
}
admin.createTable(descriptor);
}
admin.close();
}
public void deleteTable(String tableName) throws IOException {
HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin();
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
admin.close();
}
public void saveOneRow(String tableName, Put put) throws IOException {
Table table = this.getTable(tableName);
table.put(put);
table.close();
}
public static void saveRows(String tableName, List<Put> puts) throws IOException {
Table table = getTable(HBaseConfiguration.create(),tableName);
table.put(puts);
table.close();
}
public void deleteOneRow(String tableName, String key) throws IOException {
Delete del = new Delete(Bytes.toBytes(key));
Table table = this.getTable(tableName);
table.delete(del);
table.close();
}
public void deleteOneData(String tableName, String rowKey, String family, String column) throws IOException {
Delete del = new Delete(rowKey.getBytes());
del.addColumn(family.getBytes(), column.getBytes());
Table table = this.getTable(tableName);
table.delete(del);
table.close();
}
public void save(String tableName, String rowKey, String family, String column, String data) throws IOException {
Put put = new Put(rowKey.getBytes());
put.addColumn(family.getBytes(), column.getBytes(), data.getBytes());
this.saveOneRow(tableName, put);
}
public void getByStartAndStopRow(String tableName, String startRow, String stopRow) throws IOException {
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(stopRow));
Table table = getTable(tableName);
ResultScanner results = null;
try {
results = table.getScanner(scan);
for (Result result : results) {
getRow(result);
}
} catch (IOException e) {
logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
} finally {
releaseHTableInterface(table);
if (null != results) {
results.close();
}
}
}
public List<Result> getResultsByFamilyAndRow(String tableName, String columnFalimy, String column, String startRow, String stopRow) throws IOException {
List<Result> rs = new ArrayList<Result>();
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(columnFalimy), Bytes.toBytes(column));
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(stopRow));
Table table = getTable(tableName);
ResultScanner results = null;
try {
results = table.getScanner(scan);
for (Result result : results) {
rs.add(result);
}
} catch (IOException e) {
logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
} finally {
releaseHTableInterface(table);
results.close();
}
return rs;
}
public List<String> getValueByRows(String tableName, List<String> rowkeys) throws IOException {
List<String> rs = new ArrayList<String>();
List<Get> gets = new ArrayList<Get>();
for (String row : rowkeys) {
Get get = new Get(row.getBytes());
gets.add(get);
}
Table table = getTable(tableName);
Result[] results;
try {
results = table.get(gets);
for (Result result : results) {
for (Cell cell : result.rawCells()) {
System.out.print("RowName:" + new String(CellUtil.cloneRow(cell)) + " ");
System.out.print("Timetamp:" + cell.getTimestamp() + " ");
System.out.print("column Family:" + new String(CellUtil.cloneFamily(cell)) + " ");
System.out.print("column:" + new String(CellUtil.cloneQualifier(cell)) + " ");
System.out.println("value:" + new String(CellUtil.cloneValue(cell)) + " ");
rs.add(new String(CellUtil.cloneValue(cell)));
}
}
} catch (IOException e) {
logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
} finally {
releaseHTableInterface(table);
}
return rs;
}
public static boolean isTableExists(Configuration conf, String tableName) throws IOException {
return ConnectionFactory.createConnection(conf).getAdmin().tableExists(TableName.valueOf(tableName));
}
public static void copyTableByRowkeys(Configuration confSource, Configuration confDest, String tableNameSource, String tableNameDest, Collection<byte[]> rowkeys)
throws IOException {
try {
Table tableSource = getTable(confSource, tableNameSource);
Table tableDest = getTable(confDest, tableNameDest);
for (byte[] row : rowkeys) {
System.out.println("-> Copy data start rowkey:" + row);
Get get = new Get(row);
Result result = tableSource.get(get);
if (result.isEmpty()) {
continue;
}
List<Put> puts = new ArrayList<Put>();
for (Cell cell : result.rawCells()) {
Put put = new Put(CellUtil.cloneRow(cell));
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
puts.add(put);
}
tableDest.put(puts);
System.out.println("=> Copy data end rowkey:" + row);
}
if (null != tableSource) {
tableSource.close();
}
if (null != tableDest) {
tableDest.close();
}
} catch (IOException e) {
logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
} finally {
}
logger.info("Copy data to " + tableNameDest + " finish.");
}
public static void copyTableByRowkeys(Configuration confSource, Configuration confDest, String tableNameSource, String tableNameDest, String startRow, String endRow)
throws IOException {
try {
Table tableSource = getTable(confSource, tableNameSource);
Table tableDest = getTable(confDest, tableNameDest);
if(!isTableExists(confDest,tableNameDest)){
Connection connDest = ConnectionFactory.createConnection(confDest);
connDest.getAdmin().createTable(tableSource.getTableDescriptor());
logger.info("TableDest create finsh:"+tableDest.getName());
tableDest = getTable(confDest, tableNameDest);
}
logger.info("tableSource:"+tableSource.getName());
logger.info("Start to search data.");
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(endRow));
ResultScanner results = tableSource.getScanner(scan);
logger.info("Search data finsh.");
for (Result result:results) {
if (result.isEmpty()) {
continue;
}
List<Put> puts = new ArrayList<Put>();
for (Cell cell : result.rawCells()) {
Put put = new Put(CellUtil.cloneRow(cell));
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
puts.add(put);
}
tableDest.put(puts);
}
if (null != tableSource) {
tableSource.close();
}
if (null != tableDest) {
tableDest.close();
}
} catch (IOException e) {
e.printStackTrace();
logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
} finally {
}
logger.info("Copy data to " + tableNameDest + " finish.");
}
public static void copyTableByRowkeys(Configuration confSource, Configuration confDest, String tableNameSource, String tableNameDest, List<String> rowkeys) throws IOException {
try {
List<Get> gets = new ArrayList<Get>();
for (String row : rowkeys) {
Get get = new Get(row.getBytes());
gets.add(get);
}
Table tableSource = getTable(confSource, tableNameSource);
Table tableDest = getTable(confDest, tableNameDest);
Result[] results = tableSource.get(gets);
for (Result result : results) {
List<Put> puts = new ArrayList<Put>();
for (Cell cell : result.rawCells()) {
Put put = new Put(CellUtil.cloneRow(cell));
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
puts.add(put);
}
tableDest.put(puts);
}
if (null != tableSource) {
tableSource.close();
}
if (null != tableDest) {
tableDest.close();
}
} catch (IOException e) {
logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
} finally {
}
}
private static Table getTable(Configuration conf, String tableName) throws IOException {
return ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));
}
public List<String> getList(String tableName, String startRow, String stopRow) throws IOException {
List<String> list = new ArrayList<String>();
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(stopRow));
Table table = getTable(tableName);
ResultScanner results = null;
try {
results = table.getScanner(scan);
for (Result result : results) {
for (Cell cell : result.rawCells()) {
list.add(new String(CellUtil.cloneValue(cell)));
}
}
} catch (IOException e) {
logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
} finally {
releaseHTableInterface(table);
if (null != results) {
results.close();
}
}
return list;
}
public void getList(String tableName, String rowKey) throws IOException {
List<String> list = new ArrayList<String>();
Get get = new Get(rowKey.getBytes());
Table table = getTable(tableName);
Result result = table.get(get);
for (Cell cell : result.listCells()) {
String columnName = new String(cell.getQualifierArray(), "UTF-8");
if (columnName.endsWith("CREATE_TIME") || columnName.endsWith("_RELEASE_TIME") || columnName.endsWith("UPDATE_TIME")) {
Long columnValue = Bytes.toLong(cell.getValueArray());
System.out.println(columnValue);
}
}
}
public void getRow(Result result) {
for (Cell cell : result.rawCells()) {
System.out.print("RowName:" + new String(CellUtil.cloneRow(cell)) + " ");
System.out.print("Timetamp:" + cell.getTimestamp() + " ");
System.out.print("column Family:" + new String(CellUtil.cloneFamily(cell)) + " ");
System.out.print("row Name:" + new String(CellUtil.cloneQualifier(cell)) + " ");
System.out.println("value:" + new String(CellUtil.cloneValue(cell)) + " ");
}
}
public void releaseHTableInterface(Table table) {
try {
if (null != table) {
table.close();
table = null;
}
} catch (IOException e) {
e.printStackTrace();
}
}
public List<String> getTableList() throws IOException{
List<String> tableNames = new ArrayList<String>();
Connection conn = getConnection();
for(TableName table :conn.getAdmin().listTableNames()){
tableNames.add(table.getNameAsString());
}
return tableNames;
}
public static void main(String[] args) {
try {
Configuration ratConfig = new Configuration();
ratConfig.set("hbase.zookeeper.quorum", "10.16.78.130,10.16.78.132");
Configuration oxConfig = new Configuration();
oxConfig.set("hbase.zookeeper.quorum", "10.16.78.133,10.16.78.135");
Configuration sssparkConfig = new Configuration();
sssparkConfig.set("hbase.zookeeper.quorum", "10.16.46.194,10.16.46.196,10.16.46.198");
Configuration e4offlineConfig = new Configuration();
e4offlineConfig.set("hbase.zookeeper.quorum", "172.16.59.131,172.16.59.132,172.16.59.133,172.16.59.134,172.16.59.135");
HBaseUtil hBaseUtil = new HBaseUtil(sssparkConfig);
System.out.println(hBaseUtil.getTableList());
} catch (Exception e) {
e.printStackTrace();
}
}
}