一、场景:给app端的用户推送push消息、短信及存储消息内容供app查询
二、分析:
1、由于要存储消息内容供app端查询,所以每条消息需要为对应的每个用户生成一条记录,这样在用户量大的情况下数据是海量的,没有上限,且不能清理历史数据
2、读和写的比例大致差不多
最终决定采用Hbase存储,kafka作消息中间件来构建整个系统
我们先来看一下Hbase是什么,为什么要用Hbase?
HBase是一种构建在HDFS之上的分布式、面向列的存储系统。在需要实时读写、随机访问超大规模数据集时,可以使用HBase
1.大:一个表可以有上亿行,上百万列。
2.面向列:面向列表(簇)的存储和权限控制,列(簇)独立检索。
3.稀疏:对于为空(NULL)的列,并不占用存储空间,因此,表可以设计的非常稀疏。
4.无模式:每一行都有一个可以排序的主键和任意多的列,列可以根据需要动态增加,同一张表中不同的行可以有截然不同的列。
5.数据多版本:每个单元中的数据可以有多个版本,默认情况下,版本号自动分配,版本号就是单元格插入时的时间戳。
6.数据类型单一:HBase中的数据都是字符串,没有类型。
三、Hbase提供对Java的api,大致操作如下:
Table table = connection.getTable(TableName.valueOf("表名"));
Put put = new Put("112233bbbcccc".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
put.add("column1".getBytes(), null, "aaa".getBytes());// 本行数据的第一列
put.add("column2".getBytes(), null, "bbb".getBytes());// 本行数据的第三列
put.add("column3".getBytes(), null, "ccc".getBytes());// 本行数据的第三列
table.put(put);// 保存数据
可以看到如果我们不封装Hbase的操作,而直接在系统中使用原生API的话,会有多麻烦, 到此,引出我们的主题,根据Hbase的特性,搭建一套ORM映射操作方式。
首先我们捋清楚几个Hbase存储时候的关键点
1、tableName:表名,需要根据此获取连接
2、family:列簇,建议把经常一起访问的比较类似的列放在同一个Column Family中,这样就可以在访问少数几个列时,只读取尽量少的数据
3、qualifier:列名,对应列的value
4、timestamp:时间戳
根据特点我们可以通过自定义注解来处理这些关键属性,如下:
@HbaseTable(tableName="t_demo") // 列名
public class Demo {
@HbaseColumn(family="rowkey", qualifier="rowkey") // rowkey值
private String id;
@HbaseColumn(family="demo", qualifier="content") // 列
private String content;
@HbaseColumn(family="demo", qualifier="avg") // 列
private String avg;
}
HbaseTable:
package com.muheda.notice.hbase;
import java.lang.annotation.*;
/**
* @Author: Sorin
* @Descriptions: 自定义注解,用于获取table
* @Date: Created in 2018/3/22
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE })
@Inherited
public @interface HbaseTable {
String tableName() default "";
}
HbaseColumn:
package com.muheda.notice.hbase;
import java.lang.annotation.*;
/**
* @Author: Sorin
* @Descriptions: 自定义注解,用于描述字段所属的 family与qualifier. 也就是hbase的列与列簇
* @Date: Created in 2018/3/22
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD })
@Inherited
public @interface HbaseColumn {
String family() default "";
String qualifier() default "";
boolean timestamp() default false;
}
接着,我们来封装一个Dao的操作:
package com.muheda.notice.hbase;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.*;
/**
* @Author: Sorin
* @Descriptions: HBaseDao操作公共类
* @Date: Created in 2018/3/22
*/
@Component("hBaseDaoUtil")
public class HBaseDaoUtil {
protected final org.slf4j.Logger logger = LoggerFactory.getLogger(this.getClass());
// 关闭连接
public static void close() {
if (HconnectionFactory.connection != null) {
try {
HconnectionFactory.connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 获取tableName
public String getORMTable(Object obj) {
HbaseTable table = obj.getClass().getAnnotation(HbaseTable.class);
return table.tableName();
}
/**
* @Descripton: 创建表
* @Author: Sorin
* @param tableName
* @param familyColumn
* @Date: 2018/3/22
*/
public void createTable(String tableName, Set<String> familyColumn) {
TableName tn = TableName.valueOf(tableName);
try {
Admin admin = HconnectionFactory.admin;
HTableDescriptor htd = new HTableDescriptor(tn);
for (String fc : familyColumn) {
HColumnDescriptor hcd = new HColumnDescriptor(fc);
htd.addFamily(hcd);
}
admin.createTable(htd);
} catch (IOException e) {
e.printStackTrace();
logger.error("创建"+tableName+"表失败!", e);
}
}
/**
* @Descripton: 删除表
* @Author: Sorin
* @param tableName
* @Date: 2018/3/22
*/
public void dropTable(String tableName) {
TableName tn = TableName.valueOf(tableName);
try {
Admin admin = HconnectionFactory.admin;
admin.disableTable(tn);
admin.deleteTable(tn);
} catch (IOException e) {
e.printStackTrace();
logger.error("删除"+tableName+"表失败!");
}
}
/**
* @Descripton: 根据条件过滤查询
* @Author: Sorin
* @param obj
* @param param
* @Date: 2018/3/26
*/
public <T> List<T> queryScan(T obj, Map<String, String> param)throws Exception{
List<T> objs = new ArrayList<T>();
String tableName = getORMTable(obj);
if (StringUtils.isBlank(tableName)) {
return null;
}
ResultScanner scanner = null;
try {
Table table = getTable(tableName);
Admin admin = HconnectionFactory.admin;
if(!admin.isTableAvailable(TableName.valueOf(tableName))){
return objs;
}
Scan scan = new Scan();
FilterList filter = new FilterList();
// 从缓存中取family和qualifier,拼装查询条件
Map<String, List<Map<String, String>>> tableMaps = HconnectionFactory.TableMaps;
List<Map<String, String>> lists = tableMaps.get(tableName);
for (Map.Entry<String, String> entry : param.entrySet()){
for (Map<String, String> map : lists) {
String family = map.get("family");
String qualifier = map.get("qualifier");
if(qualifier.equals(entry.getKey())){
Filter filterDetail = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(entry.getValue()));
filter.addFilter(filterDetail);
}
}
}
scan.setFilter(filter);
scanner = table.getScanner(scan);
for (Result result : scanner) {
T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName));
objs.add(beanClone);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("查询失败!");
throw new Exception(e);
} finally {
scanner.close();
}
return objs;
}
/**
* @Descripton: 根据rowkey查询
* @Author: Sorin
* @param obj
* @param rowkeys
* @Date: 2018/3/22
*/
public <T> List<T> get(T obj, String ... rowkeys) {
List<T> objs = new ArrayList<T>();
String tableName = getORMTable(obj);
if (StringUtils.isBlank(tableName)) {
return objs;
}
try {
Admin admin = HconnectionFactory.admin;
if(!admin.isTableAvailable(TableName.valueOf(tableName))){
return objs;
}
List<Result> results = getResults(tableName, rowkeys);
if (results.isEmpty()) {
return objs;
}
for (int i = 0; i < results.size(); i++) {
T bean = null;
Result result = results.get(i);
if (result == null || result.isEmpty()) {
continue;
}
bean = HBaseBeanUtil.resultToBeanNew(result, obj, tableName);
objs.add(bean);
}
}catch (Exception e){
e.printStackTrace();
}
return objs;
}
/**
* @Descripton: 保存实体对象
* @Author: Sorin
* @param objs
* @Date: 2018/3/22
*/
public <T> boolean save(T ... objs) {
List<Put> puts = new ArrayList<Put>();
String tableName = "";
try {
for (Object obj : objs) {
if (obj == null) {
continue;
}
tableName = getORMTable(obj);
Put put = HBaseBeanUtil.beanToPut(obj, tableName);
puts.add(put);
}
}catch (Exception e){
e.printStackTrace();
logger.error("保存Hbase异常!");
}
return savePut(puts, tableName);
}
/**
* @Descripton: 根据tableName保存
* @Author: Sorin
* @param tableName
* @param objs
* @Date: 2018/3/22
*/
public <T> void save(String tableName, T ... objs){
List<Put> puts = new ArrayList<Put>();
for (Object obj : objs) {
if (obj == null) {
continue;
}
try {
Put put = HBaseBeanUtil.beanToPut(obj, tableName);
puts.add(put);
} catch (Exception e) {
e.printStackTrace();
logger.warn("", e);
}
}
savePut(puts, tableName);
}
/**
* @Descripton: 删除
* @Author: Sorin
* @param obj
* @param rowkeys
* @Date: 2018/3/22
*/
public <T> void delete(T obj, String... rowkeys) {
String tableName = "";
tableName = getORMTable(obj);
if (StringUtils.isBlank(tableName)) {
return;
}
List<Delete> deletes = new ArrayList<Delete>();
for (String rowkey : rowkeys) {
if (StringUtils.isBlank(rowkey)) {
continue;
}
deletes.add(new Delete(Bytes.toBytes(rowkey)));
}
delete(deletes, tableName);
}
/**
* @Descripton: 批量删除
* @Author: Sorin
* @param deletes
* @param tableName
* @Date: 2018/3/22
*/
private void delete(List<Delete> deletes, String tableName) {
try {
Table table = getTable(tableName);
if (StringUtils.isBlank(tableName)) {
logger.info("tableName为空!");
return;
}
table.delete(deletes);
} catch (IOException e) {
e.printStackTrace();
logger.error("删除失败!",e);
}
}
/**
* @Descripton: 根据tableName获取列簇名称
* @Author: Sorin
* @param tableName
* @Date: 2018/3/22
*/
public List<String> familys(String tableName) {
try {
Table table = getTable(tableName);
List<String> columns = new ArrayList<String>();
if (table==null) {
return columns;
}
HTableDescriptor tableDescriptor = table.getTableDescriptor();
HColumnDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies();
for (HColumnDescriptor columnDescriptor :columnDescriptors) {
String columnName = columnDescriptor.getNameAsString();
columns.add(columnName);
}
return columns;
} catch (Exception e) {
e.printStackTrace();
logger.error("查询列簇名称失败!" ,e);
}
return new ArrayList<String>();
}
// 保存方法
private boolean savePut(List<Put> puts, String tableName){
if (StringUtils.isBlank(tableName)) {
return false;
}
try {
Table table = getTable(tableName);
table.put(puts);
return true;
}catch (IOException e) {
e.printStackTrace();
return false;
}
}
// 获取查询结果
private List<Result> getResults(String tableName, String... rowkeys) {
List<Result> resultList = new ArrayList<Result>();
List<Get> gets = new ArrayList<Get>();
for (String rowkey : rowkeys) {
if (StringUtils.isBlank(rowkey)) {
continue;
}
Get get = new Get(Bytes.toBytes(rowkey));
gets.add(get);
}
try {
Table table = getTable(tableName);
Result[] results = table.get(gets);
Collections.addAll(resultList, results);
return resultList;
} catch (Exception e) {
e.printStackTrace();
return resultList;
}
}
/**
* @Descripton: 根据条件过滤查询(大于等于)
* @Author: Sorin
* @param obj
* @param param
* @Date: 2018/3/26
*/
public <T> List<T> queryScanGreater(T obj, Map<String, String> param)throws Exception{
List<T> objs = new ArrayList<T>();
String tableName = getORMTable(obj);
if (StringUtils.isBlank(tableName)) {
return null;
}
ResultScanner scanner = null;
try {
Table table = getTable(tableName);
Admin admin = HconnectionFactory.admin;
if(!admin.isTableAvailable(TableName.valueOf(tableName))){
return objs;
}
Scan scan = new Scan();
// 从缓存中取family和qualifier,拼装查询条件
Map<String, List<Map<String, String>>> tableMaps = HconnectionFactory.TableMaps;
List<Map<String, String>> lists = tableMaps.get(tableName);
for (Map.Entry<String, String> entry : param.entrySet()){
for (Map<String, String> map : lists) {
String family = map.get("family");
String qualifier = map.get("qualifier");
if(qualifier.equals(entry.getKey())){
Filter filter = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(entry.getValue()));
scan.setFilter(filter);
}
}
}
scanner = table.getScanner(scan);
for (Result result : scanner) {
T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName));
objs.add(beanClone);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("查询失败!");
throw new Exception(e);
}finally {
scanner.close();
}
return objs;
}
/**
* @Descripton: 分页查询数据
* @Author: Sorin
* @param obj
* @param startrowname
* @param pageSize
* @Date: 2018/4/25
*/
public <T> List<T> queryScanPage(T obj, String startrowname, String pageSize, Map<String, String> param) throws Exception{
List<T> objs = new ArrayList<T>();
String tableName = getORMTable(obj);
if (StringUtils.isBlank(tableName)) {
return null;
}
ResultScanner scanner = null;
try {
Table table = getTable(tableName);
Filter filter = new PageFilter(Integer.parseInt(pageSize));
FilterList filterList = new FilterList();
Scan scan = new Scan(Bytes.toBytes(startrowname));
// 从缓存中取family和qualifier,拼装查询条件
Map<String, List<Map<String, String>>> tableMaps = HconnectionFactory.TableMaps;
List<Map<String, String>> lists = tableMaps.get(tableName);
for (Map.Entry<String, String> entry : param.entrySet()){
for (Map<String, String> map : lists) {
String family = map.get("family");
String qualifier = map.get("qualifier");
if(qualifier.equals(entry.getKey())){
Filter filterDetail = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(entry.getValue()));
filterList.addFilter(filterDetail);
}
}
}
filterList.addFilter(filter);
scan.setFilter(filterList);
scan.setReversed(true);
scanner = table.getScanner(scan);
for (Result result : scanner) {
T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName));
objs.add(beanClone);
}
}catch (Exception e){
e.printStackTrace();
logger.error("查询失败!");
}finally {
scanner.close();
}
return objs;
}
/**
* 根据rowkey查询记录
* @param obj
* @param rowkey "rowkey"开始字符
* @param <T>
* @return
*/
public <T> List<T> queryScanRowkey(T obj, String rowkey){
List<T> objs = new ArrayList<T>();
String tableName = getORMTable(obj);
if (StringUtils.isBlank(tableName)) {
return null;
}
ResultScanner scanner = null;
try {
Table table = getTable(tableName);
Scan scan = new Scan();
scan.setRowPrefixFilter(Bytes.toBytes(rowkey));
scanner = table.getScanner(scan);
for (Result result : scanner) {
T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName));
objs.add(beanClone);
}
}catch (Exception e){
e.printStackTrace();
logger.error("查询失败!");
}finally {
scanner.close();
}
return objs;
}
/**
* 根据rowkey查询记录-分页
* @param obj
* @param rowkey "rowkey"开始字符
* @param <T>
* @return
*/
public <T> List<T> queryScanRowkeyPage(T obj, String rowkey){
List<T> objs = new ArrayList<T>();
String tableName = getORMTable(obj);
if (StringUtils.isBlank(tableName)) {
return null;
}
ResultScanner scanner = null;
FilterList filterList = new FilterList();
Scan scan = new Scan(Bytes.toBytes(rowkey));
try {
Table table = getTable(tableName);
Filter filterDetail = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(rowkey.getBytes()));
filterList.addFilter(filterDetail);
scan.setFilter(filterList);
scanner = table.getScanner(scan);
for (Result result : scanner) {
T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName));
objs.add(beanClone);
}
}catch (Exception e){
e.printStackTrace();
logger.error("查询失败!");
}finally {
scanner.close();
}
return objs;
}
/**
* @Descripton: 根据表名获取连接,避免每次操作hbase都获取连接
* @Author: Sorin
* @param tableName
* @Date: 2018/5/4
*/
private Table getTable(String tableName){
Table table = null;
try {
if("bn_user".equals(tableName)){
table = HconnectionFactory.UserTable;
}else if("bn_notice_user".equals(tableName)){
table = HconnectionFactory.NoticeUserTable;
}else if("bn_notice".equals(tableName)){
table = HconnectionFactory.NoticeTable;
}else if("bn_message".equals(tableName)){
table = HconnectionFactory.MessageTable;
}else{
HconnectionFactory.connection.getTable(TableName.valueOf(tableName));
}
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
}
HbaseBeanUtil:
package com.muheda.notice.hbase;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
/**
* @Author: Sorin
* @Descriptions:
* @Date: Created in 2018/3/22
*/
public class HBaseBeanUtil {
private static final Logger logger = LoggerFactory.getLogger(HBaseBeanUtil.class);
/**
* JavaBean转换为Put
* @param <T>
* @param obj
* @return
* @throws Exception
*/
public static <T> Put beanToPut(T obj) throws Exception {
Put put = new Put(Bytes.toBytes(parseObjId(obj)));
Class<?> clazz = obj.getClass();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (!field.isAnnotationPresent(HbaseColumn.class)) {
continue;
}
field.setAccessible(true);
HbaseColumn orm = field.getAnnotation(HbaseColumn.class);
String family = orm.family();
String qualifier = orm.qualifier();
if (StringUtils.isBlank(family) || StringUtils.isBlank(qualifier)) {
continue;
}
Object fieldObj = field.get(obj);
if (fieldObj.getClass().isArray()) {
logger.error("nonsupport");
}
if ("rowkey".equalsIgnoreCase(qualifier) || "rowkey".equalsIgnoreCase(family)) {
continue;
}
if (field.get(obj) != null || StringUtils.isNotBlank(field.get(obj).toString())) {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(field.get(obj).toString()));
}
}
return put;
}
/**
* 获取Bean中的id,作为Rowkey
* @param <T>
*
* @param obj
* @return
*/
public static <T> String parseObjId(T obj) {
Class<?> clazz = obj.getClass();
try {
Field field = clazz.getDeclaredField("id");
field.setAccessible(true);
Object object = field.get(obj);
return object.toString();
} catch (NoSuchFieldException e) {
logger.error("", e);
} catch (SecurityException e) {
logger.error("", e);
} catch (IllegalArgumentException e) {
logger.error("", e);
} catch (IllegalAccessException e) {
logger.error("", e);
}
return "";
}
/**
* HBase result 转换为 bean
* @param <T>
* @param result
* @param obj
* @return
* @throws Exception
*/
public static <T> T resultToBean(Result result, T obj) throws Exception {
if (result == null) {
return null;
}
Class<?> clazz = obj.getClass();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (!field.isAnnotationPresent(HbaseColumn.class)) {
continue;
}
HbaseColumn orm = field.getAnnotation(HbaseColumn.class);
String family = orm.family();
String qualifier = orm.qualifier();
boolean timeStamp = orm.timestamp();
if (StringUtils.isBlank(family) || StringUtils.isBlank(qualifier)) {
continue;
}
String fieldName = field.getName();
String value = "";
if ("rowkey".equalsIgnoreCase(family)) {
value = new String(result.getRow());
} else {
value = getResultValueByType(result, family, qualifier, timeStamp);
}
String firstLetter = fieldName.substring(0, 1).toUpperCase();
String setMethodName = "set" + firstLetter + fieldName.substring(1);
Method setMethod = clazz.getMethod(setMethodName, new Class[] { field.getType() });
setMethod.invoke(obj, new Object[] { value });
}
return obj;
}
/**
* @param result
* @param family
* @param qualifier
* @param timeStamp
* @return
*/
private static String getResultValueByType(Result result, String family, String qualifier, boolean timeStamp) {
if (!timeStamp) {
return new String(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)));
}
List<Cell> cells = result.getColumnCells(Bytes.toBytes(family), Bytes.toBytes(qualifier));
if (cells.size() == 1) {
Cell cell = cells.get(0);
return cell.getTimestamp() + "";
}
return "";
}
}
至此,HBASE的操作我们已封装好了,可以直接向下面这样使用:
@Component("demoDao")
public class DemoDao {
@Autowired
private HBaseDaoUtil hBaseDaoUtil;
/**
* @Descripton:
* @Author: Sorin
* @param demo
* @Date: 2018/3/22
*/
public void save(Demo demo) {
hBaseDaoUtil.save(demo);
}
/**
* @Descripton:
* @Author: Sorin
* @param demo
* @param id
* @Date: 2018/3/22
*/
public List<Demo> getById(Demo demo, String id) {
return hBaseDaoUtil.get(demo, id);
}
}