HBASE 链接工具类,以及基本table操作和实体cover工具类
hbaseUtils
package com.feifan.data.utils;
import com.feifan.data.model.CrawlerHbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import java.beans.PropertyDescriptor;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@DependsOn("springContextHolder") //控制依赖顺序,保证springContextHolder类在之前已经加载
@Component
public class HBaseUtils {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private static Environment environment;
private static Configuration conf = HBaseConfiguration.create();
private static ExecutorService pool = Executors.newScheduledThreadPool(20); //设置连接池
private static Connection connection = null;
private static HBaseUtils instance = null;
private static Admin admin = null;
@Autowired
private HBaseUtils(Environment env) {
HBaseUtils.environment = env;
if(connection == null){
try {
//将hbase配置类中定义的配置加载到连接池中每个连接里
String hbaseQuorum = environment.getProperty("hbase.zookeeper.quorum");
String clientPort = environment.getProperty("hbase.zookeeper.property.clientPort");
String znodeParent = environment.getProperty("zookeeper.znode.parent");
conf.set("hbase.zookeeper.quorum", hbaseQuorum);
conf.set("hbase.zookeeper.property.clientPort", clientPort);
conf.set("zookeeper.znode.parent", znodeParent);
connection = ConnectionFactory.createConnection(conf, pool);
admin = connection.getAdmin();
} catch (IOException e) {
logger.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);
}
}
}
//简单单例方法,如果autowired自动注入就不需要此方法
public static synchronized HBaseUtils getInstance(){
if(instance == null){
//instance = new HBaseUtils();
}
return instance;
}
/**
* 创建表
*
* @param tableName 表名
* @param columnFamily 列族(数组)
*/
public void createTable(String tableName, String[] columnFamily) throws IOException{
TableName name = TableName.valueOf(tableName);
//如果存在则删除
if (admin.tableExists(name)) {
admin.disableTable(name);
admin.deleteTable(name);
logger.error("create htable error! this table {} already exists!", name);
} else {
HTableDescriptor desc = new HTableDescriptor(name);
for (String cf : columnFamily) {
desc.addFamily(new HColumnDescriptor(cf));
}
admin.createTable(desc);
}
}
/**
* 插入记录(单行单列族-多列多值)
*
* @param tableName 表名
* @param row 行名
* @param columnFamilys 列族名
* @param columns 列名(数组)
* @param values 值(数组)(且需要和列一一对应)
*/
public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
TableName name = TableName.valueOf(tableName);
Table table = connection.getTable(name);
Put put = new Put(Bytes.toBytes(row));
for (int i = 0; i < columns.length; i++) {
put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
table.put(put);
}
}
/**
* rowkeyList 查找
* @param rowkeyList
* @return
* @throws IOException
*/
public static List<Result> qurryTableBatchByRowKey( String tableName,List<String> rowkeyList) throws IOException {
List<Result> list= new ArrayList<>();
List<Get> getList = new ArrayList();
Table table = connection.getTable( TableName.valueOf(tableName));// 获取表
for (String rowkey : rowkeyList){
//把rowkey加到get里,再把get装到list中
Get get = new Get(Bytes.toBytes(rowkey));
get.setId(rowkey);
getList.add(get);
}
Result[] results = table.get(getList);//重点在这,直接查getList<Get>
/* for (Result result : results){//对返回的结果集进行操作
for (Cell kv : result.rawCells()) {
String value = Bytes.toString(CellUtil.cloneValue(kv));
list.add(value);
}
list.add(result);
}*/
return Arrays.asList(results);
}
/**
* 插入记录(单行单列族-多列多值) 反射
* @param tableName
* @param rowkey
* @param column
* @param pojo
* @throws IOException
*/
public void newInsertRecords(String tableName, String rowkey, String column, Object pojo) throws IOException {
TableName name = TableName.valueOf(tableName);
Table table = connection.getTable(name);
PropertyDescriptor[] pds = BeanUtils.getPropertyDescriptors(pojo.getClass());
BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(pojo);
Put put = new Put(Bytes.toBytes(rowkey));
for (PropertyDescriptor propertyDescriptor : pds) {
String properName = propertyDescriptor.getName();
String value = null;
try {
if(beanWrapper.getPropertyValue(properName)==null|| properName.equals("class")){
continue;
}
value = String.valueOf(beanWrapper.getPropertyValue(properName));
System.out.println("properName: "+properName+"============="+"value: "+value);
} catch (BeansException e) {
e.printStackTrace();
continue;
}
if (!StringUtils.isBlank(value)) {
put.add(Bytes.toBytes(column), Bytes.toBytes(properName), Bytes.toBytes(value));
}
}
table.put(put);
}
/**
* c插入多条
* @param tableName
* @param rowkey
* @param column
* @param pojo
* @param tClass
* @param <T>
* @throws IOException
*/
public <T> void newInsertList(String tableName, String rowkey, String column, List pojo, Class<T> tClass) throws IOException {
TableName name = TableName.valueOf(tableName);
Table table = connection.getTable(name);
for (Object o : pojo) {
PropertyDescriptor[] pds = BeanUtils.getPropertyDescriptors(tClass);
BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(o);
Put put = new Put(Bytes.toBytes(rowkey));
for (PropertyDescriptor propertyDescriptor : pds) {
String properName = propertyDescriptor.getName();
String value = null;
try {
if(beanWrapper.getPropertyValue(properName)==null|| properName.equals("class")){
continue;
}
value = String.valueOf(beanWrapper.getPropertyValue(properName));
// System.out.println("properName: "+properName+"============="+"value: "+value);
} catch (BeansException e) {
e.printStackTrace();
continue;
}
if (!StringUtils.isBlank(value)) {
put.add(Bytes.toBytes(column), Bytes.toBytes(properName), Bytes.toBytes(value));
}
}
table.put(put);
}
}
/**
* 插入记录(单行单列族-单列单值)
*
* @param tableName 表名
* @param row 行名
* @param columnFamily 列族名
* @param column 列名
* @param value 值
*/
public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {
TableName name = TableName.valueOf(tableName);
Table table = connection.getTable(name);
Put put = new Put(Bytes.toBytes(row));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
}
/**
* 删除一行记录
*
* @param tablename 表名
* @param rowkey 行名
*/
public void deleteRow(String tableName, String rowKey) throws IOException {
TableName name = TableName.valueOf(tableName);
Table table = connection.getTable(name);
Delete d = new Delete(rowKey.getBytes());
table.delete(d);
}
/**
* 删除单行单列族记录
* @param tablename 表名
* @param rowkey 行名
* @param columnFamily 列族名
*/
public void deleteColumnFamily(String tableName, String rowkey, String columnFamily) throws IOException {
TableName name = TableName.valueOf(tableName);
Table table = connection.getTable(name);
Delete d = new Delete(rowkey.getBytes()).deleteFamily(Bytes.toBytes(columnFamily));
table.delete(d);
}
/**
* 删除单行单列族单列记录
*
* @param tablename 表名
* @param rowkey 行名
* @param columnFamily 列族名
* @param column 列名
*/
public void deleteColumn(String tableName, String rowkey, String columnFamily, String column) throws IOException {
TableName name = TableName.valueOf(tableName);
Table table = connection.getTable(name);
Delete d = new Delete(rowkey.getBytes()).deleteColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
table.delete(d);
}
/**
* 查找一行记录
*
* @param tablename 表名
* @param rowKey 行名
*/
public static String selectRow(String tableName, String rowKey) throws IOException {
String record = "";
TableName name= TableName.valueOf(tableName);
Table table = connection.getTable(name);
Get g = new Get(rowKey.getBytes());
Result rs = table.get(g);
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = rs.getMap();
for (Cell cell : rs.rawCells()) {
StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("\t")
.append(Bytes.toString(cell.getFamily())).append("\t")
.append(Bytes.toString(cell.getQualifier())).append("\t")
.append(Bytes.toString(cell.getValue())).append("\n");
String str = stringBuffer.toString();
record += str;
}
return record;
}
/**
* 查找单行单列族单列记录
*
* @param tablename 表名
* @param rowKey 行名
* @param columnFamily 列族名
* @param column 列名
* @return
*/
public static String selectValue(String tableName, String rowKey, String columnFamily, String column) throws IOException {
TableName name= TableName.valueOf(tableName);
Table table = connection.getTable(name);
Get g = new Get(rowKey.getBytes());
g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
Result rs = table.get(g);
return Bytes.toString(rs.value());
}
/**
* 查询表中所有行(Scan方式)
*
* @param tablename
* @return
*/
public String scanByTableName(String tableName) throws IOException {
String record = "";
TableName name= TableName.valueOf(tableName);
Table table = connection.getTable(name);
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
try {
for(Result result : scanner){
for (Cell cell : result.rawCells()) {
StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("\t")
.append(Bytes.toString(cell.getFamily())).append("\t")
.append(Bytes.toString(cell.getQualifier())).append("\t")
.append(Bytes.toString(cell.getValue())).append("\n");
String str = stringBuffer.toString();
record += str;
}
}
} finally {
if (scanner != null) {
scanner.close();
}
}
return record;
}
public List<Result> scanByStartEnd(String tableName, String start, String end) throws IOException {
List<Result> scanResult = new ArrayList<Result>();
TableName name= TableName.valueOf(tableName);
Table table = connection.getTable(name);
Scan scan = new Scan(start.getBytes(), end.getBytes());
ResultScanner resultScanner = table.getScanner(scan);
Iterator<Result> iterator = resultScanner.iterator();
while (iterator.hasNext()) {
Result result = iterator.next();
scanResult.add(result);
}
return scanResult;
}
/**
* 根据rowkey关键字查询报告记录
*
* @param tablename
* @param rowKeyword
* @return
*/
public List<Object> scanByRowKeyword(String tableName, String rowKeyword) throws IOException {
ArrayList<Object> list = new ArrayList<Object>();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
//添加行键过滤器,根据关键字匹配
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
scan.setFilter(rowFilter);
ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
//TODO 此处根据业务来自定义实现
list.add(result);
}
} finally {
if (scanner != null) {
scanner.close();
}
}
return list;
}
/**
* 根据rowkey关键字和时间戳范围查询报告记录
*
* @param tablename
* @param rowKeyword
* @return
*/
public List<Object> scanByRowKeywordTimestamp(String tableName, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {
ArrayList<Object> list = new ArrayList<Object>();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
//添加scan的时间范围
scan.setTimeRange(minStamp, maxStamp);
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
scan.setFilter(rowFilter);
ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
//TODO 此处根据业务来自定义实现
list.add(result);
}
} finally {
if (scanner != null) {
scanner.close();
}
}
return list;
}
/**
* 删除表操作
*
* @param tablename
*/
public void deleteTable(String tableName) throws IOException {
TableName name= TableName.valueOf(tableName);
if(admin.tableExists(name)) {
admin.disableTable(name);
admin.deleteTable(name);
}
}
/**
* 利用协处理器进行全表count统计
*
* @param tablename
*/
public Long countRowsWithCoprocessor(String tableName) throws Throwable {
TableName name= TableName.valueOf(tableName);
HTableDescriptor descriptor = admin.getTableDescriptor(name);
String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
if (! descriptor.hasCoprocessor(coprocessorClass)) {
admin.disableTable(name);
descriptor.addCoprocessor(coprocessorClass);
admin.modifyTable(name, descriptor);
admin.enableTable(name);
}
//计时
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Scan scan = new Scan();
AggregationClient aggregationClient = new AggregationClient(conf);
Long count = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan);
stopWatch.stop();
System.out.println("RowCount:" + count + ",全表count统计耗时:" + stopWatch.getTotalTimeMillis());
return count;
}
}
springContextHolder (Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean)
/**
*
*/
package com.feifan.data.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean
* @author chenwen
*
*/
@Component
public class SpringContextHolder implements ApplicationContextAware {
private static ApplicationContext applicationContext;
/* (non-Javadoc)
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// TODO Auto-generated method stub
SpringContextHolder.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
assertApplicationContext();
return applicationContext;
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String beanName) {
assertApplicationContext();
return (T) applicationContext.getBean(beanName);
}
public static <T> T getBean(Class<T> requiredType) {
assertApplicationContext();
return applicationContext.getBean(requiredType);
}
private static void assertApplicationContext() {
if (SpringContextHolder.applicationContext == null) {
throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
}
}
}
HBaseConvetorUtil实体类转换工具(hbase 列族转 javabean)
package com.feifan.data.utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import java.lang.reflect.Field;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author: craywen
* @date: 2020-07-02 11:09
* @desc: 实体转换工具类
*/
public class HBaseConvetorUtil {
public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
/**
* @Title: convetor
* @Description: 传入hbase返回结果值,返回实例集合
* @param
* @return
* @throws
*/
public static <T> List<T> convetor(Class<T> cla, ResultScanner resultScanner) throws Exception{
List<T> list = new ArrayList<T>();
for (Result result :resultScanner) {
Field []fileds=cla.getDeclaredFields();
T t = cla.newInstance();
for(Field field:fileds){
field.setAccessible(true);
String fileName=field.getName();
if(result.containsColumn(Bytes.toBytes("data"), Bytes.toBytes(fileName.toUpperCase()))){
if(result.getValue(Bytes.toBytes("data"), Bytes.toBytes(fileName.toUpperCase())).length==0){
continue;
}
String value=Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes(fileName.toUpperCase())));
field.set(t, value);
}
}
list.add(t);
}
return list;
}
/**
* 传入hbase返回结果值,返回实例集合
* @param cla
* @param results
* @param <T>
* @return
* @throws Exception
*/
public static <T> List<T> convetor(Class<T> cla, List<Result> results) throws Exception{
List<T> list = new ArrayList<T>();
for (Result result :results) {
boolean isNull=false;
Field []fileds=cla.getDeclaredFields();
T t = cla.newInstance();
for(Field field:fileds){
field.setAccessible(true);
String fileName=field.getName();
//特殊处理id
if(fileName.equals("id")){
String id = Bytes.toString(result.getRow());
field.set(t,id);
if(StringUtils.isBlank(id) || StringUtils.isEmpty(id) || id.equals("null")){
isNull=true;
break;
}
continue;
}
if(result.containsColumn(Bytes.toBytes("data"), Bytes.toBytes(fileName))){
if(result.getValue(Bytes.toBytes("data"), Bytes.toBytes(fileName)).length==0){
continue;
}
String type = String.valueOf(field.getGenericType());
String value=Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes(fileName)));
if(type.equals("int")|| type.equals("class java.lang.Integer")){
field.set(t, Integer.valueOf(value));
}else if(type.equals("class java.util.Date")){
String format = sdf.format(value);
field.set(t, sdf.parse(format));
}else if(type.equals("class java.lang.String")){
field.set(t, value);
}
}
}
if(!isNull){
list.add(t);
}
}
return list;
}
/**
* @Title: convetor
* @Description: 传入hbase返回结果值,返回实例
* @param
* @return
* @throws
*/
public static <T> T Tconvetor(Class<T> cla, Result result) throws Exception{
Field []fileds=cla.getDeclaredFields();
T t = cla.newInstance();
for(Field field:fileds){
field.setAccessible(true);
String fileName=field.getName();
if(result.containsColumn(Bytes.toBytes("data"), Bytes.toBytes(fileName.toUpperCase()))){
if(result.getValue(Bytes.toBytes("data"), Bytes.toBytes(fileName.toUpperCase())).length==0){
continue;
}
String value=Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes(fileName.toUpperCase())));
field.set(t, value);
}
}
return t;
}
/**
* @Title: convetor
* @Description: 传入保存实例和主键ID,返回PutDelete
* @param
* @return
* @throws
public static <T> T PutDeleteconvetor(T t,String id) throws Exception {
Put put=new Put(Bytes.toBytes(id));
Delete delete=new Delete(Bytes.toBytes(id));
Field [] fileds=t.getClass().getDeclaredFields();
for(Field field:fileds){
field.setAccessible(true);
String fieldName=field.getName();
Object value =field.get(t);
if(null==value){
delete.deleteColumn(Bytes.toBytes("BASEINFO"), Bytes.toBytes(fieldName.toUpperCase()));
continue;
}
put.add(Bytes.toBytes("BASEINFO"), Bytes.toBytes(fieldName.toUpperCase()), Bytes.toBytes((String)value));
}
PutDelete putdelete = new PutDelete();
putdelete.setPut(put);
putdelete.setDelete(delete);
return putdelete;
}
*/
}
pom依赖以及 yml配置
<!--habse -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<!--<version>2.1.0</version>-->
<version>1.2.0</version>
</dependency>
<!--<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.12.1-hadoop2</version>
</dependency>-->
<!-- <dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop-hbase</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>-->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
yml
server.port=9090
### solr整合
spring.data.solr.core=SentimentCollnew
spring.data.solr.host=http://192.168.11.201:8983/solr,http://192.168.11.202:8983/solr,http://192.168.11.203:8983/solr
spring.data.solr.repositories.enabled=true
spring.data.solr.zk-host=192.168.11.201:2181,192.168.11.202:2181,192.168.11.203:2181
### rabbit 整合
spring.rabbitmq.host=192.168.11.56
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
#spring.rabbitmq.publisher-confirms=true
#spring.rabbitmq.publisher-confirm-type=correlated
##手动
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
### hbase 整合
hbase.zookeeper.quorum=192.168.11.206,192.168.11.207,192.168.11.208
hbase.zookeeper.property.clientPort=2181
zookeeper.znode.parent=/hbase
### mybaits
mybatis.configuration.cache-enabled=false
mybatis.configuration.map-underscore-to-camel-case=true
mybatis.mapper-locations=classpath*:/mapper/**/*Mapper.xml
mybatis.configuration.type-aliases-package=com.feifan.data.dao
mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
###pagehelper
pagehelper.helper-dialect=mysql
pagehelper.reasonable=true
pagehelper.support-methods-arguments=false
### 日志等级 logging.level.com.feifan.data=error
#logging.level.com.feifan.data=error
logging.level.root=error
logging.file.max-size=10MB
# jdbc_config datasource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/yq?useUnicode=true&characterEncoding=UTF-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=&BW8@YKhFtA#p1Mz
# Hikari will use the above plus the following to setup connection pooling Hikari
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=DatebookHikariCP
spring.datasource.hikari.max-lifetime=1800000
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.connection-test-query=SELECT 1
## hbase table
hbase.mian.table=crawler