HbaseDao:是定义一些内部操作的工具类,是实现自己程序主要功能的实现类,如:发布微博、删除微博、关注用户、取关用户、获取用户微博详情和获取用户初始化界面
package tyh.dao;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import tyh.constants.constants;
import java.io.IOException;
import java.util.ArrayList;
public class HbaseDao {
/**
* 1、发布微博
* 2、删除微博
* 3、关注用户
* 4、取关用户
* 5、获取用户微博详情
* 6、获取用户初始化界面
*/
//一、发布微博
public static void publishWeiBo(String Uid, String content) throws IOException {
/**
* Uid为用户id
* content为微博内容
*/
//1、获取connection对象
Connection connection = ConnectionFactory.createConnection(constants.CONFIGURATION);
//第一部分:操作微博内容表
//2、获取微博内容表对象
//表名是统一的表名,防止值在调用方法时输入容易出错,
// 且此操作只针对唯一的表名,所以也同configuration一样从constants中获取
Table contTable = connection.getTable(TableName.valueOf(constants.CONTENT_TABLE));
//3、获取当前时间戳
/**
* 用意是每个人可能会发很多微博,所以让微博内容存到历史版本不行,会导致过去发布的微博丢失
* 让每个微博存放一个列的话会导致列太多,所以将ts与用户id联合起来作为row key来存放用户发布的微博
*/
//用最大的时间戳减获取到的时间戳,实现越早发的微博在越底下
long ts_now = System.currentTimeMillis();
long ts_max = 9999999999999L;
long ts = ts_max - ts_now;
//4、获取row key
String row_Key = Uid + "_" + ts;
//5、创建put对象
Put put = new Put(Bytes.toBytes(row_Key));
//6、给put对象赋值
put.addColumn(Bytes.toBytes(constants.CONTENT_TABLE_CF),
//列名
Bytes.toBytes("content"),
Bytes.toBytes(content));
//7、执行插入数据操作
contTable.put(put);
//第二部分:将微博推送给粉丝
//1、获取微博关系表对象
Table relaTable = connection.getTable(TableName.valueOf(constants.RELATION_TABLE));
//2、获取粉丝列族
Get get = new Get(Bytes.toBytes(Uid));
//指定到粉丝列族
get.addFamily(Bytes.toBytes(constants.RELATION_TABLE_FANS));
//得到粉丝的cell集合
Result result = relaTable.get(get);
//3、创建写入收件箱表的put集合
ArrayList<Put> inboxPuts = new ArrayList<Put>();
//4、遍历粉丝
for (Cell cell : result.rawCells()) {
//5、构建收件箱表的put对象,得到关系表的粉丝列族的
Put inboxPut = new Put(CellUtil.cloneQualifier(cell));
//6、给put对象赋值,列族通过constants获取,列名为当前发布微博的人的uid,值为row_Key
inboxPut.addColumn(Bytes.toBytes(constants.INBOX_TABLE_CONCERN),
Bytes.toBytes(Uid),
Bytes.toBytes(row_Key));
//7、存到集合中
inboxPuts.add(inboxPut);
}
//8、判断是否有粉丝
if (inboxPuts.size() > 0) {
//9、如果有粉丝再创建收件箱表对象
Table inboxTable = connection.getTable(TableName.valueOf(constants.INBOX_TABLE));
//10、将put集合写入收件箱表中
inboxTable.put(inboxPuts);
//11、关闭inboxTable表的连接
inboxTable.close();
}
//12、关闭其它资源
contTable.close();
relaTable.close();
connection.close();
}
//二、删除微博
public static void deleteWeiBo(String Uid, String Row_Key) throws IOException {
//1、获取connection对象
Connection connection = ConnectionFactory.createConnection(constants.CONFIGURATION);
//第一部分:删除微博
//1、获取微博内容表对象
Table contTable = connection.getTable(TableName.valueOf(constants.CONTENT_TABLE));
//2、获取delete对象
Delete deleteWeiBo = new Delete(Bytes.toBytes(Row_Key));
//3、删除博客
contTable.delete(deleteWeiBo);
//第二部分:删除微博粉丝推送的那篇博客
//1、获取微博关系表对象
Table relaTable = connection.getTable(TableName.valueOf(constants.RELATION_TABLE));
//2、获取粉丝列族
Get get = new Get(Bytes.toBytes(Row_Key));
//指定到粉丝列族
get.addFamily(Bytes.toBytes(constants.RELATION_TABLE_FANS));
//得到粉丝的cell集合
Result result = relaTable.get(get);
//3、创建删除微博推送的delete集合
ArrayList<Delete> inboxDeletes = new ArrayList<Delete>();
//4、遍历粉丝
for (Cell cell : result.rawCells()) {
//5、创建delete对象
Delete delete = new Delete(Bytes.toBytes(Uid));
//6、给delete对象赋值
delete.addColumns(Bytes.toBytes(constants.INBOX_TABLE_CONCERN), CellUtil.cloneQualifier(cell));
//7、将delete对象传入列表中
inboxDeletes.add(delete);
}
//删除推送
if (inboxDeletes.size() > 0) {
//8、创建收件箱表对象
Table inboxTable = connection.getTable(TableName.valueOf(constants.INBOX_TABLE));
//9、删除推送信息
inboxTable.delete(inboxDeletes);
//10、关闭表连接
inboxTable.close();
}
//11、关闭其它连接
contTable.close();
relaTable.close();
connection.close();
}
//三、关注用户
//uid为操作用户,attends为要关注的对象们
public static void addAttends(String uid, String... attends) throws IOException {
//需要关注的人是否添加
if (attends.length <= 0) {
System.out.println("请添加待关注的人");
return;
}
//获取connection对象
Connection connection = ConnectionFactory.createConnection(constants.CONFIGURATION);
//第一部分:操作用户关系表
/**
* 有两个操作需要我们完成
* 1、操作用户添加需要关注的对象,那么操作用户的关注列族attend要添加关注对象的值
* 2、被关注者需要添加操作用户到fans列族中
* 这两个操作都是操作一个表,所以可以存到一个集合中
*/
//1、获取用户关系表对象,relation:关系
Table relaTable = connection.getTable(TableName.valueOf(constants.RELATION_TABLE));
//2、创建一个集合存放操作用户关系表的put对象
ArrayList<Put> relaPuts = new ArrayList<Put>();
//3、创建操作用户的put对象,operator:操作者
Put operPut = new Put(Bytes.toBytes(uid));
//4、循环创建被关注者的put对象
for (String attend : attends) {
//5、创建被关注者的put对象,is operator:被操作者
Put isOperPut = new Put(Bytes.toBytes(attend));
//6、为被操作者的put对象赋值,即将操作者的uid放入被操作者的粉丝列族中
isOperPut.addColumn(Bytes.toBytes(constants.RELATION_TABLE_FANS),
Bytes.toBytes(uid),
Bytes.toBytes(uid));
//7、为操作者的put对象赋值,即将被操作者(attend)加入操作者的attends列族中
operPut.addColumn(Bytes.toBytes(constants.RELATION_TABLE_ATTEND),
Bytes.toBytes(attend),
Bytes.toBytes(attend));
//8、将被操作者的put对象放入集合中
relaPuts.add(isOperPut);
}
//9、将操作者的put对象放入集合中
relaPuts.add(operPut);
//10、执行put操作
relaTable.put(relaPuts);
//第二部分:操作收件箱表
/**
* 主要进行的操作是将关注的对象发送的微博推送给自己,即:
* 1、遍历关注的对象
* 2、获取关注对象发布的微博
* 3、将相应的信息传入put中,然后
*/
//1、获取微博内容表对象
Table contTable = connection.getTable(TableName.valueOf(constants.CONTENT_TABLE));
//2、创建收件箱表put对象,因为都是对操作者进行推送,所以用的是同一个row key所以
Put inboxPut = new Put(Bytes.toBytes(uid));
//3、循环attends,获取每个被关注者的微博
for (String attend : attends) {
//4、获取当前被关注者的微博使用scan,因为我们不知道那个微博的时间戳,所以用attend_作为startrow,attend|作为startrow
//得到ResultScanner,那么如何只获取2个版本呢?
//可以在存的时候就讲后发的放到最前面,因为是按时间戳来分的,而时间戳又有13位,所以用13位的999来减去获取的时间戳就能
//达到最新发布的时间戳在最前面一位
Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|"));
ResultScanner resultScanner = contTable.getScanner(scan);
long ts = System.currentTimeMillis();
//5、对获取到的resultScanner进行遍历,因为只有两个版本所以循环两次
//获取到一行数据
int i = constants.INBOX_TABLE_VERSIONS;
for (Result result : resultScanner){
if (i-- == 0){
break;
}
//6、给收件箱put对象赋值
/**
* 此时会出现一个问题,就是程序执行时没有赋值ts,导致使用程序内部的ts,导致时间戳相同,只会显示一个数据
* 我们每次add时给ts++,让每次的ts都不同
*/
inboxPut.addColumn(Bytes.toBytes(constants.INBOX_TABLE_CONCERN),
Bytes.toBytes(attend),
ts++,
result.getRow());
}
}
//7、判断put对象是否为空
if (!inboxPut.isEmpty()) {
//8、获取收件箱表对象
Table inboxTable = connection.getTable(TableName.valueOf(constants.INBOX_TABLE));
//9、执行收件箱表插入数据操作
inboxTable.put(inboxPut);
//10、关闭inboxTable表连接
inboxTable.close();
}
//11、关闭其它连接
contTable.close();
relaTable.close();
connection.close();
}
//四、取关用户
public static void deleteAttends(String uid, String... delAttends) throws IOException {
if (delAttends.length <= 0){
System.out.println("请添加要取关的用户!!!");
return;
}
//获取connection对象
Connection connection = ConnectionFactory.createConnection(constants.CONFIGURATION);
//第一部分:操作用户关系表对象
//1、获取用户关系表对象
Table relaTable = connection.getTable(TableName.valueOf(constants.RELATION_TABLE));
//2、创建一个集合,存放用户关系表delete操作对象
ArrayList<Delete> relaDelete = new ArrayList<Delete>();
//3、创建操作者(uid)的delete对象
Delete operDelete = new Delete(Bytes.toBytes(uid));
//4、循环创建被删除对象(delAttends)的delete对象
for (String delAttend : delAttends) {
//5、给操作者delete对象赋值
operDelete.addColumns(Bytes.toBytes(constants.RELATION_TABLE_ATTEND),
Bytes.toBytes(delAttend));
//6、创建被操作者delete对象
Delete isOperDelete = new Delete(Bytes.toBytes(delAttend));
//7、给被操作者delete对象赋值
isOperDelete.addColumns(Bytes.toBytes(constants.RELATION_TABLE_FANS),
Bytes.toBytes(uid));
//8、将被操作者的delete对象放到集合中
relaDelete.add(isOperDelete);
}
//9、将操作者的delete对象放入集合中
relaDelete.add(operDelete);
//10、执行用户关系表的delete操作
relaTable.delete(relaDelete);
//第二部分:删除收件箱的推送信息
//1、获取收件箱表对象
Table inboxTable = connection.getTable(TableName.valueOf(constants.INBOX_TABLE));
//2、创建删除者收件箱表delete对象
Delete inboxDelete = new Delete(Bytes.toBytes(uid));
//3、循环删除取关对象的推送信息
for (String delAttend : delAttends) {
//4、给delete对象赋值
inboxDelete.addColumns(Bytes.toBytes(constants.INBOX_TABLE_CONCERN),
Bytes.toBytes(delAttend));
}
//5、对收件箱表进行删除操作
inboxTable.delete(inboxDelete);
//6、关闭资源
inboxTable.close();
relaTable.close();
connection.close();
}
//五:获取用户微博详情
public static void getDetails(String uid) throws IOException {
//1、获取connection对象
Connection connection = ConnectionFactory.createConnection(constants.CONFIGURATION);
//2、获取微博内容对象
Table contTable = connection.getTable(TableName.valueOf(constants.CONTENT_TABLE));
//3、创建微博内容表scan对象,来获取该用户的所有微博
Scan scan = new Scan();
//构建过滤器,过滤有uid_的数据
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));
//为scan加过滤器
scan.setFilter(rowFilter);
//4、获取数据
ResultScanner contScanner = contTable.getScanner(scan);
//5、遍历获取到的数据
for (Result contResult : contScanner) {
for (Cell contCell : contResult.rawCells()) {
//6、打印数据
System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(contCell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(contCell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(contCell)) +
",value:" + Bytes.toString(CellUtil.cloneValue(contCell)));
}
}
//7、关闭资源
contTable.close();
connection.close();
}
//六、获取用户初始化界面
public static void getInit(String uid) throws IOException {
//1、获取connection对象
Connection connection = ConnectionFactory.createConnection(constants.CONFIGURATION);
//2、获取收件箱表对象
Table inboxTable = connection.getTable(TableName.valueOf(constants.INBOX_TABLE));
//3、获取微博内容表对象
Table contTable = connection.getTable(TableName.valueOf(constants.CONTENT_TABLE));
//4、创建收件箱表get对象,并获取数据(设置最大版本)
Get inboxGet = new Get(Bytes.toBytes(uid));
//设置获取数据的最大版本数
//如果不传参数,默认为表的最大版本数,如果传参数,不能大于表的最大版本数,否则会报错
inboxGet.setMaxVersions();
Result inboxResult = inboxTable.get(inboxGet);
//5、遍历收件箱表获取的数据
for (Cell inboxCell : inboxResult.rawCells()) {
//6、构建微博内容表的get对象
Get contGet = new Get(CellUtil.cloneValue(inboxCell));
//7、获取微博数据内容
Result contResult = contTable.get(contGet);
//8、解析内容并打印
for (Cell contCell : contResult.rawCells()) {
System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(contCell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(contCell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(contCell)) +
",value:" + Bytes.toString(CellUtil.cloneValue(contCell)));
}
}
//9、关闭资源
contTable.close();
inboxTable.close();
connection.close();
}
}