使用HBase API对数据库的基本操作,包括创建表、查找表数据等。
1、HBase配置
pom.xml
<!-- Apache HBase Client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.1</version>
</dependency>
application.properties
HBase.nodes=10.xx.xx.43
HBase.maxsize=500000
HBaseConfig.java
@Configuration
public class HBaseConfig {
//HBase相关配置
@Value("${HBase.nodes}")
private String nodes;
@Value("${HBase.maxsize}")
private String maxsize;
@Bean
public HBaseService getHbaseService(){
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum",nodes );
conf.set("hbase.client.keyvalue.maxsize",maxsize);
return new HBaseService(conf);
}
}
2、业务模拟
HBaseService.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import .....
/**
* HBase数据库基本操作
*/
public class HBaseService {
private Logger log = LoggerFactory.getLogger(HBaseService.class);
// The administrative API for HBase
// Admin can be used to create, drop, list, enable and disable and
// otherwise modify tables,
// as well as perform other administrative operations.
private Admin admin = null;
private Connection connection = null;
public HBaseService(Configuration conf) {
try {
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
} catch (IOException e) {
log.error("获取HBase连接失败");
}
}
/**
* 创建表
* create <table>, {NAME => <column family>, VERSIONS => <VERSIONS>}
* shell command: create ‘user’, ‘cf1’
*/
public boolean creatTable(String tableName, List<String> columnFamily) {
try {
//列族 column family
List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
columnFamily.forEach(cf -> {
cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
Bytes.toBytes(cf)).build());
});
//表 table
TableDescriptor tableDesc = TableDescriptorBuilder
.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(cfDesc).build();
if (admin.tableExists(TableName.valueOf(tableName))) {
log.debug("table Exists!");
} else {
admin.createTable(tableDesc);
log.debug("create table Success!");
}
} catch (IOException e) {
log.error(MessageFormat.format("创建表{0}失败", tableName), e);
return false;
} finally {
close(admin, null, null);
}
return true;
}
/**
* 查询库中所有表的表名
* shell command: list
*/
public List<String> getAllTableNames() {
List<String> result = new ArrayList<>();
try {
TableName[] tableNames = admin.listTableNames();
for (TableName tableName : tableNames) {
result.add(tableName.getNameAsString());
}
} catch (IOException e) {
log.error("获取所有表的表名失败", e);
} finally {
close(admin, null, null);
}
return result;
}
/**
* 遍历查询指定表中的所有数据
* shell command: scan 'user'
*/
public Map<String, Map<String, String>> getResultScanner(String tableName) {
Scan scan = new Scan();
return this.queryData(tableName, scan);
}
/**
* 通过表名以及过滤条件查询数据
*/
private Map<String, Map<String, String>> queryData(String tableName,
Scan scan) {
// <rowKey,对应的行数据>
Map<String, Map<String, String>> result = new HashMap<>();
ResultScanner rs = null;
// 获取表
Table table = null;
try {
table = getTable(tableName);
rs = table.getScanner(scan);
for (Result r : rs) {
// 每一行数据
Map<String, String> columnMap = new HashMap<>();
String rowKey = null;
// 行键,列族和列限定符一起确定一个单元(Cell)
for (Cell cell : r.listCells()) {
if (rowKey == null) {
rowKey = Bytes.toString(cell.getRowArray(),
cell.getRowOffset(), cell.getRowLength());
}
columnMap.put(
// 列限定符
Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(),
cell.getQualifierLength()),
// 列族
Bytes.toString(cell.getValueArray(),
cell.getValueOffset(),
cell.getValueLength()));
}
if (rowKey != null) {
result.put(rowKey, columnMap);
}
}
} catch (IOException e) {
log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}",
tableName), e);
} finally {
close(null, rs, table);
}
return result;
}
/**
* 根据tableName和rowKey精确查询行数据
*/
public Map<String, String> getRowData(String tableName, String rowKey) {
// 返回的键值对
Map<String, String> result = new HashMap<>();
Get get = new Get(Bytes.toBytes(rowKey));
// 获取表
Table table = null;
try {
table = getTable(tableName);
Result hTableResult = table.get(get);
if (hTableResult != null && !hTableResult.isEmpty()) {
for (Cell cell : hTableResult.listCells()) {
result.put(
Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(),
cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(),
cell.getValueOffset(),
cell.getValueLength()));
}
}
} catch (IOException e) {
log.error(MessageFormat.format(
"查询一行的数据失败,tableName:{0},rowKey:{1}", tableName, rowKey), e);
} finally {
close(null, null, table);
}
return result;
}
/**
* 为表添加 or 更新数据
*/
public void putData(String tableName, String rowKey, String familyName,
String[] columns, String[] values) {
// 获取表
Table table = null;
try {
table = getTable(tableName);
putData(table, rowKey, tableName, familyName, columns, values);
} catch (Exception e) {
log.error(MessageFormat.format(
"为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
tableName, rowKey, familyName), e);
} finally {
close(null, null, table);
}
}
private void putData(Table table, String rowKey, String tableName,
String familyName, String[] columns, String[] values) {
try {
// 设置rowkey
Put put = new Put(Bytes.toBytes(rowKey));
if (columns != null && values != null
&& columns.length == values.length) {
for (int i = 0; i < columns.length; i++) {
if (columns[i] != null && values[i] != null) {
put.addColumn(Bytes.toBytes(familyName),
Bytes.toBytes(columns[i]),
Bytes.toBytes(values[i]));
} else {
throw new NullPointerException(MessageFormat.format(
"列名和列数据都不能为空,column:{0},value:{1}", columns[i],
values[i]));
}
}
}
table.put(put);
log.debug("putData add or update data Success,rowKey:" + rowKey);
table.close();
} catch (Exception e) {
log.error(MessageFormat.format(
"为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
tableName, rowKey, familyName), e);
}
}
/**
* 根据表名 获取table
* Used to communicate with a single HBase table.
* Table can be used to get, put, delete or scan data from a table.
*/
private Table getTable(String tableName) throws IOException {
return connection.getTable(TableName.valueOf(tableName));
}
/**
* 关闭流
*/
private void close(Admin admin, ResultScanner rs, Table table) {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
log.error("关闭Admin失败", e);
}
}
if (rs != null) {
rs.close();
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
log.error("关闭Table失败", e);
}
}
}
}
3、测试
HBaseTest.java
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class HBaseTest {
@Autowired
private HBaseService hbaseService;
/**
* 测试删除、创建表
*/
@Test
public void testCreateTable() {
// 创建表
hbaseService.creatTable("test_base", Arrays.asList("f", "back"));
// 插入三条数据
hbaseService.putData("test_base", "66804_000001", "f", new String[] {
"project_id", "varName", "coefs", "pvalues", "tvalues",
"create_time" }, new String[] { "40866", "mob_3", "0.9416",
"0.0000", "12.2293", "null" });
hbaseService.putData("test_base", "66804_000002", "f", new String[] {
"project_id", "varName", "coefs", "pvalues", "tvalues",
"create_time" }, new String[] { "40866", "idno_prov", "0.9317",
"0.0000", "9.8679", "null" });
hbaseService.putData("test_base", "66804_000003", "f", new String[] {
"project_id", "varName", "coefs", "pvalues", "tvalues",
"create_time" }, new String[] { "40866", "education", "0.8984",
"0.0000", "25.5649", "null" });
// 查询数据
// 1. 根据rowKey查询
Map<String, String> result1 = hbaseService.getRowData("test_base",
"66804_000001");
System.out.println("+++++++++++根据rowKey查询+++++++++++");
result1.forEach((k, value) -> {
System.out.println(k + "---" + value);
});
System.out.println();
// 2. 遍历查询
Map<String, Map<String, String>> result2 = hbaseService
.getResultScanner("test_base");
System.out.println("+++++++++++遍历查询+++++++++++");
result2.forEach((k, value) -> {
System.out.println(k + "---" + value);
});
}
}
shell命令行查看表:
eclipse运行结果:
参考资料:https://www.zifangsky.cn/1286.html