谷粒微博:代码实现

常量接口

public interface Constant {
    //命名空间
    String NAMESPACE="weibo";

    //内容表
    String CONTENT="weibo:content";

    //用户关系表
    String RELATIONS="weibo:relations";

    //收件箱表
    String INBOX="weibo:inbox";
}

工具类及测试代码代码

  • WeiBoUtil
public class WeiBoUtil {

    private static Configuration cfg = HBaseConfiguration.create();

    static {
        cfg.set("hbase.zookeeper.quorum", "hcmaster:2181,hcslave1:2181,hcslave2:2181");
    }
}
  • Weibo
public class WeiBoUtilTest {
}

连接池工具类

public class HBasePoolUtil {
    private static Configuration conf = null;
    // 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    private static ExecutorService executor = null;
    private static Connection conn = null;

    static {
        try {
            conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "hcmaster:2181,hcslave1:2181,hcslave2:2181");
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("hbase.defaults.for.version.skip", "true");
            executor = new ThreadPoolExecutor(5, 15, 1000,
                    TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(10),  // 使用有界队列,避免OOM
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            conn = ConnectionFactory.createConnection(conf, executor);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static Connection getConn() {
        return conn;
    }
}

具体代码

创建命名空间以及表名的定义

public static void createNamespace(String namespace) throws IOException {
    Connection conn = HBasePoolUtil.getConn();
    Admin admin = conn.getAdmin();
    NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
    admin.createNamespace(namespaceDescriptor);
    admin.close();
}

测试代码:

@Test
public void createNamespace() throws IOException {
    WeiBoUtil.createNamespace("weibo");
}

结果:

创建表

public static void creteaTable(String tableName, int version, String... columnFamily) throws IOException {
    Connection conn = HBasePoolUtil.getConn();
    Admin admin = conn.getAdmin();

    boolean tableExists = admin.tableExists(TableName.valueOf(tableName));
    if (tableExists) {
        System.out.println(tableName + " exists!");
        return;
    }
    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));

    for (String cf : columnFamily) {
        ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(cf.getBytes()).setMaxVersions(version).build();
        builder.setColumnFamily(columnFamilyDescriptor);
    }
    admin.createTable(builder.build());
    admin.close();
}
测试代码:
@Test
public void creteaTable() throws IOException {
    //创建内容表
    WeiBoUtil.creteaTable(Constant.CONTENT, 1, "info");
    //创建用户关系表
    WeiBoUtil.creteaTable(Constant.RELATIONS, 1, "attends", "fans");
    //创建收件箱表
    WeiBoUtil.creteaTable(Constant.INBOX, 3, "info");
}
运行程序发现在weibo命名空间下创建了weibo:contet、weibo:relations、webo:inbox三个表。

发布微博内容

发布微博时,首先需要往微博内容表中添加数据,然后需要更新收件箱表fans的数据,它包括:
1. 获取当前uid的fans
2. 更新收件箱表中fans的数据

public static void createData(String uid, String content) throws IOException {
    Connection conn = HBasePoolUtil.getConn();
    long currentTimeMillis = System.currentTimeMillis();
    String rowKey = uid + "_" + currentTimeMillis;
    //往内容表中添加数据
    Table contentTable = conn.getTable(TableName.valueOf(Constant.CONTENT));
    Put put = new Put(Bytes.toBytes(rowKey));
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), currentTimeMillis, Bytes.toBytes(content)); //手动指定时间戳
    contentTable.put(put);
    //用户发了微博,要同时更新它的fans的微博收件列表
    //1.获取关注了微博发送者的用户列表
    Table relationsTable = conn.getTable(TableName.valueOf(Constant.RELATIONS));
    //获取关系表中fans
    Get get = new Get(Bytes.toBytes(uid));
    get.addFamily(Bytes.toBytes("fans"));
    Result result = relationsTable.get(get);
    List<Cell> cells = result.listCells();
    //如果用户没有关注者,不需要更新微博收件列表表
    if (cells == null || cells.size() <= 0) {
        return;
    }
    List<Put> putList = new ArrayList<>();
    for (Cell cell : cells) {
        byte[] fansUid = CellUtil.cloneQualifier(cell);
        Put inboxPut = new Put(fansUid);
        inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), Bytes.toBytes(rowKey));
        putList.add(inboxPut);
    }
    //更新fans的收件箱表
    Table inboxTable = conn.getTable(TableName.valueOf(Constant.INBOX));
    inboxTable.put(putList);

    inboxTable.close();
    relationsTable.close();
    contentTable.close();
}
测试代码:
@Test
public void createData() throws IOException {
    WeiBoUtil.createData("1001","1001 de weibo");
    WeiBoUtil.createData("1001","1001 de weibo");
    WeiBoUtil.createData("1001","1001 de weibo");
    WeiBoUtil.createData("1002","1002 de weibo");
    WeiBoUtil.createData("1002","1002 de weibo");
    WeiBoUtil.createData("1002","1002 de weibo");
    WeiBoUtil.createData("1003","1002 de weibo");
}
结果:

添加关注用户

具体步骤::关注别人的同时会成为别人的fans
在微博用户关系表中
a、 对当前主动操作的用户添加新关注的好友attends
b、 对被关注的用户添加新的粉丝fans
微博收件箱表中,添加所关注的用户发布的微博
a、 在微博内容表中获取被关注者最新的几条微博的rowkey
b、 收件箱中添加操作人所关注的人的数据(最新的几条信息)

public static void addAttends(String uid, String... uids) throws IOException {
    //参数过滤
    if(uids == null || uids.length <= 0 || uid == null || uid.length() <= 0){
        return;
    }

    Connection conn = HBasePoolUtil.getConn();

    Table relationsTable = conn.getTable(TableName.valueOf(Constant.RELATIONS));
    Put relationsPut = new Put(Bytes.toBytes(uid));//操作者的put
    List<Put> putList = new ArrayList<>();
    for (String u : uids) {
        relationsPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(u), Bytes.toBytes(u));
        Put fansPut = new Put(Bytes.toBytes(u));//创建被关注者的Put对象,添加操作人的attends
        fansPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));//添加被操作人的fans
        putList.add(fansPut);
    }
    putList.add(relationsPut);
    relationsTable.put(putList);

    //获取内容表中被关注者的rowkey
    Table contentTable = conn.getTable(TableName.valueOf(Constant.CONTENT));
    Put inboxPut = new Put(uid.getBytes());
    for (String s : uids) {
        Scan scan = new Scan();
        scan.setFilter(new PrefixFilter(s.getBytes()));
        ResultScanner resultScanner = contentTable.getScanner(scan);
        for (Result result : resultScanner) {
            String rowkey = Bytes.toString(result.getRow());
            String[] split = rowkey.split("_");
            byte[] row = result.getRow(); //获取rowkey
            //都采用发布微博时的时间戳
            inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), Long.parseLong(split[1]), row);
        }
    }
    Table inboxTable = conn.getTable(TableName.valueOf(Constant.INBOX));
    inboxTable.put(inboxPut);

    inboxTable.close();
    relationsTable.close();
    contentTable.close();
}
测试代码:
@Test
public void addAttends() throws IOException {
    WeiBoUtil.addAttends("1001","1002","1003");
}
结果:

取关用户

在微博用户关系表中
1. 删除操作者关注列族的待取关用户
2. 删除待取关用户用户fans列族的操作者
收件箱表

public static void deleteAttended(String uid, String... uids) throws IOException {
    //参数过滤
    if(uids == null || uids.length <= 0 || uid == null || uid.length() <= 0){
        return;
    }
    //获取连接
    Connection conn = HBasePoolUtil.getConn();
    //获取表对象(2个)
    Table relationsTable = conn.getTable(TableName.valueOf(Constant.RELATIONS));
    //1
    List<Delete> deleteList = new ArrayList<>();
    Delete attendsDelete = new Delete(Bytes.toBytes(uid));
    for (String u : uids) {
        attendsDelete.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(u));
        Delete fansDelete = new Delete(Bytes.toBytes(u));
        fansDelete.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
        deleteList.add(fansDelete);
    }
    deleteList.add(attendsDelete);
    relationsTable.delete(deleteList);
    //2、删除收件箱表操作者取关用户的信息
    Table inboxTable = conn.getTable(TableName.valueOf(Constant.INBOX));
    Delete inboxDelete = new Delete(Bytes.toBytes(uid));
    for (String u : uids) {
        inboxDelete.addColumn(Bytes.toBytes("info"), Bytes.toBytes(u));//只能删除一条
    }
    inboxTable.delete(inboxDelete);

    inboxTable.close();
    relationsTable.close();
}
测试代码
@Test
public void deleteAttended() throws IOException {
    WeiBoUtil.deleteAttended("1001","1002");
}
结果:

获取关注的人的微博内容:初始化页面用

步骤:
a、从微博收件箱中获取所关注的用户的微博RowKey
b、根据获取的RowKey,从微博内容表中得到微博内容

public static void getInit(String uid) throws IOException {
    //获取连接
    Connection conn = HBasePoolUtil.getConn();
    //获取表对象(2个)
    Table inoboxTable = conn.getTable(TableName.valueOf(Constant.INBOX));
    Table contentTable = conn.getTable(TableName.valueOf(Constant.CONTENT));
    //获取收件箱数据
    Get inoboxGet = new Get(Bytes.toBytes(uid));
    inoboxGet.readAllVersions();

    List<Get> getList = new ArrayList<>();
    Result result = inoboxTable.get(inoboxGet);
    Cell[] cells = result.rawCells();
    for (Cell cell : cells) {
        Get contentGet = new Get(CellUtil.cloneValue(cell));
        getList.add(contentGet);
    }
    Result[] results = contentTable.get(getList);
    for (Result res : results) {
        for (Cell cell : res.rawCells()) {
            //得到rowkey
            System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
            //得到列族
            System.out.print("\t列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.print("\t列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("\t值:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }
    //内容表获取微博内容
    inoboxTable.close();
    contentTable.close();
}
测试代码:
@Test
public void getInit() throws IOException {
    WeiBoUtil.getInit("1001");
}
结果:

查看某个用户发布的所有微博

public static void getData(String uid) throws IOException {
    Connection conn = HBasePoolUtil.getConn();
    Table contentTable = conn.getTable(TableName.valueOf(Constant.CONTENT));

    Scan scan = new Scan();
    RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(uid + "_"));
    scan.setFilter(rowFilter);
    ResultScanner resultScanner = contentTable.getScanner(scan);
    for (Result result : resultScanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            //得到rowkey
            System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
            //得到列族
            System.out.print("\t列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.print("\t列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("\t值:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }
    contentTable.close();
}

测试代码:

@Test
public void getData() throws IOException {
    WeiBoUtil.getData("1002");
}

结果:

发布了407 篇原创文章 · 获赞 798 · 访问量 7万+

猜你喜欢

转载自blog.csdn.net/lianghecai52171314/article/details/104873420