常量接口
public interface Constant {
//命名空间
String NAMESPACE="weibo";
//内容表
String CONTENT="weibo:content";
//用户关系表
String RELATIONS="weibo:relations";
//收件箱表
String INBOX="weibo:inbox";
}
工具类及测试代码代码
- WeiBoUtil
public class WeiBoUtil {
private static Configuration cfg = HBaseConfiguration.create();
static {
cfg.set("hbase.zookeeper.quorum", "hcmaster:2181,hcslave1:2181,hcslave2:2181");
}
}
public class WeiBoUtilTest {
}
连接池工具类
public class HBasePoolUtil {
private static Configuration conf = null;
// 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
private static ExecutorService executor = null;
private static Connection conn = null;
static {
try {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hcmaster:2181,hcslave1:2181,hcslave2:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.defaults.for.version.skip", "true");
executor = new ThreadPoolExecutor(5, 15, 1000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10), // 使用有界队列,避免OOM
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
conn = ConnectionFactory.createConnection(conf, executor);
} catch (IOException e) {
e.printStackTrace();
}
}
public static Connection getConn() {
return conn;
}
}
具体代码
创建命名空间以及表名的定义
public static void createNamespace(String namespace) throws IOException {
Connection conn = HBasePoolUtil.getConn();
Admin admin = conn.getAdmin();
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
admin.createNamespace(namespaceDescriptor);
admin.close();
}
测试代码:
@Test
public void createNamespace() throws IOException {
WeiBoUtil.createNamespace("weibo");
}
结果:
创建表
public static void creteaTable(String tableName, int version, String... columnFamily) throws IOException {
Connection conn = HBasePoolUtil.getConn();
Admin admin = conn.getAdmin();
boolean tableExists = admin.tableExists(TableName.valueOf(tableName));
if (tableExists) {
System.out.println(tableName + " exists!");
return;
}
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
for (String cf : columnFamily) {
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(cf.getBytes()).setMaxVersions(version).build();
builder.setColumnFamily(columnFamilyDescriptor);
}
admin.createTable(builder.build());
admin.close();
}
测试代码:
@Test
public void creteaTable() throws IOException {
//创建内容表
WeiBoUtil.creteaTable(Constant.CONTENT, 1, "info");
//创建用户关系表
WeiBoUtil.creteaTable(Constant.RELATIONS, 1, "attends", "fans");
//创建收件箱表
WeiBoUtil.creteaTable(Constant.INBOX, 3, "info");
}
运行程序发现在weibo命名空间下创建了weibo:contet、weibo:relations、webo:inbox三个表。
发布微博内容
发布微博时,首先需要往微博内容表中添加数据,然后需要更新收件箱表fans的数据,它包括:
1. 获取当前uid的fans
2. 更新收件箱表中fans的数据
public static void createData(String uid, String content) throws IOException {
Connection conn = HBasePoolUtil.getConn();
long currentTimeMillis = System.currentTimeMillis();
String rowKey = uid + "_" + currentTimeMillis;
//往内容表中添加数据
Table contentTable = conn.getTable(TableName.valueOf(Constant.CONTENT));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), currentTimeMillis, Bytes.toBytes(content)); //手动指定时间戳
contentTable.put(put);
//用户发了微博,要同时更新它的fans的微博收件列表
//1.获取关注了微博发送者的用户列表
Table relationsTable = conn.getTable(TableName.valueOf(Constant.RELATIONS));
//获取关系表中fans
Get get = new Get(Bytes.toBytes(uid));
get.addFamily(Bytes.toBytes("fans"));
Result result = relationsTable.get(get);
List<Cell> cells = result.listCells();
//如果用户没有关注者,不需要更新微博收件列表表
if (cells == null || cells.size() <= 0) {
return;
}
List<Put> putList = new ArrayList<>();
for (Cell cell : cells) {
byte[] fansUid = CellUtil.cloneQualifier(cell);
Put inboxPut = new Put(fansUid);
inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), Bytes.toBytes(rowKey));
putList.add(inboxPut);
}
//更新fans的收件箱表
Table inboxTable = conn.getTable(TableName.valueOf(Constant.INBOX));
inboxTable.put(putList);
inboxTable.close();
relationsTable.close();
contentTable.close();
}
测试代码:
@Test
public void createData() throws IOException {
WeiBoUtil.createData("1001","1001 de weibo");
WeiBoUtil.createData("1001","1001 de weibo");
WeiBoUtil.createData("1001","1001 de weibo");
WeiBoUtil.createData("1002","1002 de weibo");
WeiBoUtil.createData("1002","1002 de weibo");
WeiBoUtil.createData("1002","1002 de weibo");
WeiBoUtil.createData("1003","1002 de weibo");
}
结果:
添加关注用户
具体步骤::关注别人的同时会成为别人的fans
在微博用户关系表中
a、 对当前主动操作的用户添加新关注的好友attends
b、 对被关注的用户添加新的粉丝fans
微博收件箱表中,添加所关注的用户发布的微博
a、 在微博内容表中获取被关注者最新的几条微博的rowkey
b、 收件箱中添加操作人所关注的人的数据(最新的几条信息)
public static void addAttends(String uid, String... uids) throws IOException {
//参数过滤
if(uids == null || uids.length <= 0 || uid == null || uid.length() <= 0){
return;
}
Connection conn = HBasePoolUtil.getConn();
Table relationsTable = conn.getTable(TableName.valueOf(Constant.RELATIONS));
Put relationsPut = new Put(Bytes.toBytes(uid));//操作者的put
List<Put> putList = new ArrayList<>();
for (String u : uids) {
relationsPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(u), Bytes.toBytes(u));
Put fansPut = new Put(Bytes.toBytes(u));//创建被关注者的Put对象,添加操作人的attends
fansPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));//添加被操作人的fans
putList.add(fansPut);
}
putList.add(relationsPut);
relationsTable.put(putList);
//获取内容表中被关注者的rowkey
Table contentTable = conn.getTable(TableName.valueOf(Constant.CONTENT));
Put inboxPut = new Put(uid.getBytes());
for (String s : uids) {
Scan scan = new Scan();
scan.setFilter(new PrefixFilter(s.getBytes()));
ResultScanner resultScanner = contentTable.getScanner(scan);
for (Result result : resultScanner) {
String rowkey = Bytes.toString(result.getRow());
String[] split = rowkey.split("_");
byte[] row = result.getRow(); //获取rowkey
//都采用发布微博时的时间戳
inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), Long.parseLong(split[1]), row);
}
}
Table inboxTable = conn.getTable(TableName.valueOf(Constant.INBOX));
inboxTable.put(inboxPut);
inboxTable.close();
relationsTable.close();
contentTable.close();
}
测试代码:
@Test
public void addAttends() throws IOException {
WeiBoUtil.addAttends("1001","1002","1003");
}
结果:
取关用户
在微博用户关系表中
1. 删除操作者关注列族的待取关用户
2. 删除待取关用户用户fans列族的操作者
收件箱表
public static void deleteAttended(String uid, String... uids) throws IOException {
//参数过滤
if(uids == null || uids.length <= 0 || uid == null || uid.length() <= 0){
return;
}
//获取连接
Connection conn = HBasePoolUtil.getConn();
//获取表对象(2个)
Table relationsTable = conn.getTable(TableName.valueOf(Constant.RELATIONS));
//1
List<Delete> deleteList = new ArrayList<>();
Delete attendsDelete = new Delete(Bytes.toBytes(uid));
for (String u : uids) {
attendsDelete.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(u));
Delete fansDelete = new Delete(Bytes.toBytes(u));
fansDelete.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
deleteList.add(fansDelete);
}
deleteList.add(attendsDelete);
relationsTable.delete(deleteList);
//2、删除收件箱表操作者取关用户的信息
Table inboxTable = conn.getTable(TableName.valueOf(Constant.INBOX));
Delete inboxDelete = new Delete(Bytes.toBytes(uid));
for (String u : uids) {
inboxDelete.addColumn(Bytes.toBytes("info"), Bytes.toBytes(u));//只能删除一条
}
inboxTable.delete(inboxDelete);
inboxTable.close();
relationsTable.close();
}
测试代码
@Test
public void deleteAttended() throws IOException {
WeiBoUtil.deleteAttended("1001","1002");
}
结果:
获取关注的人的微博内容:初始化页面用
步骤:
a、从微博收件箱中获取所关注的用户的微博RowKey
b、根据获取的RowKey,从微博内容表中得到微博内容
public static void getInit(String uid) throws IOException {
//获取连接
Connection conn = HBasePoolUtil.getConn();
//获取表对象(2个)
Table inoboxTable = conn.getTable(TableName.valueOf(Constant.INBOX));
Table contentTable = conn.getTable(TableName.valueOf(Constant.CONTENT));
//获取收件箱数据
Get inoboxGet = new Get(Bytes.toBytes(uid));
inoboxGet.readAllVersions();
List<Get> getList = new ArrayList<>();
Result result = inoboxTable.get(inoboxGet);
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
Get contentGet = new Get(CellUtil.cloneValue(cell));
getList.add(contentGet);
}
Result[] results = contentTable.get(getList);
for (Result res : results) {
for (Cell cell : res.rawCells()) {
//得到rowkey
System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
//得到列族
System.out.print("\t列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.print("\t列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("\t值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//内容表获取微博内容
inoboxTable.close();
contentTable.close();
}
测试代码:
@Test
public void getInit() throws IOException {
WeiBoUtil.getInit("1001");
}
结果:
查看某个用户发布的所有微博
public static void getData(String uid) throws IOException {
Connection conn = HBasePoolUtil.getConn();
Table contentTable = conn.getTable(TableName.valueOf(Constant.CONTENT));
Scan scan = new Scan();
RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(uid + "_"));
scan.setFilter(rowFilter);
ResultScanner resultScanner = contentTable.getScanner(scan);
for (Result result : resultScanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
//得到rowkey
System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
//得到列族
System.out.print("\t列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.print("\t列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("\t值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
contentTable.close();
}
测试代码:
@Test
public void getData() throws IOException {
WeiBoUtil.getData("1002");
}
结果: