此文章介绍了Java连接HBase数据库实现的微博案例,以及程序的简单测试。
0 前期准备
Hadoop安装:https://blog.csdn.net/Tiezhu_Wang/article/details/113860404
HBase安装:https://blog.csdn.net/Tiezhu_Wang/article/details/114574899
1 HBase数据库表设计
需求:
- 发布微博内容
- 在微博内容表中 添加一条数据(发布者)
- 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者)
- 添加关注用户
- 在微博用户关系表中 添加新的好友关注(attends)
- 从被关注用户角度来说, 新增粉丝用户(fans)
- 微博邮件箱表添加关注用户发布的微博内容
- 移除或者取消关注用户
- 在微博用户关系表中 移除新的好友关注(attends)
- 从被关注用户角度来说, 删除粉丝用户(fans)
- 微博邮件箱表删除关注用户发布的微博内容
- 获取关注用户发布的微博内容
- 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey
- 根据上面获取到的微博内容的rowkey 获取微博内容
- 微博展示的内容信息:
- message: 发布者ID , 时间戳 , content
1.1 user表
ROWKEY | COLUMN+CELL | |
---|---|---|
info | ||
userid | password | nickname |
1.2 blog表
ROWKEY | COLUMN+CELL | |
---|---|---|
info | ||
blogid | userid | content |
1.3 email表
ROWKEY | COLUMN+CELL | ||
---|---|---|---|
info | |||
emailid | attendid | userid | blogrowKey |
1.4 relation表
ROWKEY | COLUMN+CELL | ||
---|---|---|---|
info | |||
relationid | userid | attendid | fanid |
2 项目依赖
新建Maven项目,添加如下依赖:
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>1.2.0</version>
</dependency>
3 实体类
User类的属性:userid、password、nickname
Blog类的属性:userid、date、content
4 基本工具类BaseMapper
考虑HBase数据库的命名空间、初始化、终止等,进行BaseMapper类的编写,实现的基本操作和代码如下:
package dao;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
public class BaseMapper {
public static Configuration configuration;
public static Connection connection;
public static Admin admin;
// 1. 初始化
public void init(){
configuration = HBaseConfiguration.create();
configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
try{
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
}catch (IOException e){
e.printStackTrace();
}
}
// 2. 关闭
public void close(){
try{
if(admin != null){
admin.close();
}
if(null != connection){
connection.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
// 3. 创建表
public void createTable(String myTableName,String[] colFamily) throws IOException {
TableName tableName = TableName.valueOf(myTableName);
if(admin.tableExists(tableName)){
System.out.println("table exists!");
}else {
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
for(String str:colFamily){
ColumnFamilyDescriptor family =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
tableDescriptor.setColumnFamily(family);
}
admin.createTable(tableDescriptor.build());
}
}
// 4. 插入数据
public void insertData(String tableName,String rowKey,String colFamily,String col,String val) throws IOException, InterruptedException {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(rowKey.getBytes());
put.addColumn(colFamily.getBytes(),col.getBytes(), val.getBytes());
table.put(put);
table.close();
Thread.sleep(1);
}
// 5. 删除数据
public void deleteData(String tableName, String rowKey) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
}
// 6. 获取某个cell内的数据
public String getData(String tableName,String rowKey,String colFamily, String col)throws Exception{
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(rowKey.getBytes());
get.addColumn(colFamily.getBytes(),col.getBytes());
Result result = table.get(get);
table.close();
return new String(result.getValue(colFamily.getBytes(),col==null?null:col.getBytes()));
}
// 7. 获取整个表中所有数据
// 注:由于此项目数据量较少,就使用此方法获取所需的数据
public List<Map<String, String>> getAllData(String tableName) throws Exception {
Table table = connection.getTable(TableName.valueOf(tableName));
List<Map<String, String>> data = new ArrayList<Map<String, String>>();
ResultScanner results = table.getScanner(new Scan());
for(Result result : results){
Map<String, String> singleData = new HashMap<String, String>();
String rowKey = "";
String date = "";
for(Cell cell : result.rawCells()){
if(rowKey.equals("")){
rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
date = new Date(cell.getTimestamp()) + "";
}
String colName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
singleData.put(colName,value);
}
singleData.put("rowKey",rowKey);
singleData.put("date",date);
data.add(singleData);
}
return data;
}
// 8. 获取某行数据
public Map<String, String> getRow(String tableName, String rowKey) throws Exception {
Table table = connection.getTable(TableName.valueOf(tableName));
Map<String, String> data = new HashMap<String,String>();
Get get = new Get(rowKey.getBytes());
if(!get.isCheckExistenceOnly()){
Result result = table.get(get);
String date = "";
for(Cell cell : result.rawCells()){
if(date.equals("")){
date = new Date(cell.getTimestamp())+"";
}
String colName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
data.put(colName,value);
}
data.put("rowKey",rowKey);
data.put("date",date);
}
return data;
}
}
5 基本操作类BlogMapper
此处的基本操作类包含需求中详细步骤的实现方法,如添加微博、添加邮件等,其中实体类Blog和User的定义略去,这个BlogMapper类继承自上述BaseMapper类。实现的方法和代码如下:
扫描二维码关注公众号,回复:
12766681 查看本文章
package dao;
import pojo.Blog;
import pojo.User;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class BlogMapper extends BaseMapper{
public BlogMapper(){
this.init();
}
/**
* @param tableName
* @param colFamily
* @throws IOException
*/
public void createTable(String tableName,String colFamily) throws IOException {
this.createTable(tableName,new String[]{
colFamily});
}
/**
* @param userid
* @param password
* @param nickname
* @throws IOException
*/
public void addUser(String userid,String password,String nickname) throws IOException, InterruptedException {
this.insertData("user",userid,"info","password",password);
this.insertData("user",userid,"info","nickname",nickname);
}
/**
* @param rowKey mighe be the time
* @param userid is the userid of the one posting the blog
* @param content is what the blog is
* @throws IOException
*/
public void addBlog(String rowKey,String userid,String content) throws IOException, InterruptedException {
this.insertData("blog",rowKey,"info","userid",userid);
this.insertData("blog",rowKey,"info","content",content);
}
/**
* @param rowKey might be the time
* @param attendid is whom the user attends
* @param userid is the attender's fan
* @param blogrowKey belongs to the attender
* @throws IOException
*
* Thinking I have a fan named A.
* If I post a blog, then my userid is the attendid and A's userid is the userid.
*/
public void addEmail(String rowKey,String attendid, String userid, String blogrowKey) throws IOException, InterruptedException {
this.insertData("email",rowKey,"info","attendid",attendid);
this.insertData("email",rowKey,"info","userid",userid);
this.insertData("email",rowKey,"info","blogrowKey",blogrowKey);
}
/**
* @param rowKey
* @param userid
* @param attendid
* @param fanid
* @throws IOException
*
* The attendid and the fanid must be the same if two users attend eacher other.
* Otherwise, one of them is none or both are none.
*/
public void addRelation(String rowKey,String userid,String attendid,String fanid) throws IOException, InterruptedException {
this.insertData("relation",rowKey,"info","userid",userid);
this.insertData("relation",rowKey,"info","attendid",attendid);
this.insertData("relation",rowKey,"info","fanid",fanid);
}
/**
* @param userid
* @param attendid is the userid of whom the user attends
* @throws IOException
*/
public void removeAttend(String userid,String attendid) throws Exception {
List<Map<String, String>> relations = this.getAllData("relation");
String rowKey = "";
for(Map<String, String> relation : relations){
if(relation.get("userid").equals(userid) && relation.get("attendid").equals(attendid)){
rowKey = relation.get("rowKey");
// set attendid none
this.insertData("relation",rowKey,"info","attendid","");
if(relation.get("fanid").equals("")){
this.deleteData("relation",rowKey);
}
break;
}
}
}
/**
* @param userid
* @param fanid is the userid of whom the user fans
* @throws IOException
*/
public void removeFan(String userid,String fanid) throws Exception {
List<Map<String, String>> relations = this.getAllData("relation");
String rowKey = "";
for(Map<String, String> relation : relations){
if(relation.get("userid").equals(userid) && relation.get("fanid").equals(fanid)){
rowKey = relation.get("rowKey");
// set fanid none
this.insertData("relation",rowKey,"info","fanid","");
if(relation.get("attendid").equals("")){
this.deleteData("relation",rowKey);
}
break;
}
}
}
/**
* @param attendid is whom the user attends
* @param userid
* @throws IOException
*/
public void removeEmails(String attendid,String userid) throws Exception {
List<Map<String, String>> emails = this.getAllData("email");
String rowKey = "";
for(Map<String, String> email : emails){
if(email.get("attendid").equals(attendid) && email.get("userid").equals(userid)){
rowKey = email.get("rowKey");
this.deleteData("email",rowKey);
}
}
}
/**
* @param attendid is whom the user attends
* @param userid
* @return
* @throws IOException
*/
public List<String> getBlogRowKeys(String attendid, String userid) throws Exception {
List<String> data = new ArrayList<String>();
List<Map<String,String>> emails = this.getAllData("email");
for(Map<String,String> email : emails){
if(email.get("attendid").equals(attendid) && email.get("userid").equals(userid)){
data.add(email.get("rowKey"));
}
}
return data;
}
/**
* @param userid
* @return
* @throws Exception
*/
public List<String> getBlogRowKeysByUserid(String userid) throws Exception {
List<String> data = new ArrayList<String>();
List<Map<String,String>> blogs = this.getAllData("blog");
for(Map<String,String> blog : blogs){
if(blog.get("userid").equals(userid)){
data.add(blog.get("rowKey"));
}
}
return data;
}
/**
* @param rowKey
* @return
* @throws IOException
*
* Search the blog by the rowKey
*/
public Blog getBlog(String rowKey) throws Exception {
Map<String, String> data = this.getRow("blog",rowKey);
String userid = data.get("userid");
String date = data.get("date");
String content = data.get("content");
Blog blog = new Blog(userid,date,content);
return blog;
}
/**
* @param userid
* @return
* @throws Exception
*/
public List<String> getAllFans(String userid) throws Exception {
List<Map<String, String>> relations = this.getAllData("relation");
List<String> fans = new ArrayList<String>();
for(Map<String, String> relation : relations){
if(relation.get("userid").equals(userid)){
fans.add(relation.get("fanid"));
}
}
return fans;
}
/**
* @param userid
* @return
* @throws Exception
*/
public List<String> getAllAttends(String userid) throws Exception {
List<Map<String, String>> relations = this.getAllData("relation");
List<String> attends = new ArrayList<String>();
for(Map<String, String> relation : relations){
if(relation.get("userid").equals(userid)){
attends.add(relation.get("attendid"));
}
}
return attends;
}
/**
* @param userid
* @return
* @throws Exception
*/
public User getUserById(String userid) throws Exception {
User user = new User();
String password = "";
if(null!=(password=this.getData("user",userid,"info","password"))){
user.setUserid(userid);
user.setPassword(password);
user.setNickname(this.getData("user",userid,"info","nickname"));
}
return user;
}
}
6 具体操作类BlogService
这里的BlogService类实现的是具体的操作,都是些业务逻辑,分别为发布微博、添加关注、取消关注和获取关注用户的微博,具体代码如下:
package service;
import dao.BlogMapper;
import pojo.Blog;
import pojo.User;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class BlogService {
BlogMapper blogMapper;
public BlogService() {
this.blogMapper = new BlogMapper();
}
public void postBlog(String userid, String content){
try{
String blogRowKey = new Date().getTime()+"";
// Add blog to blog list
blogMapper.addBlog(blogRowKey,userid,content);
List<String> fans = blogMapper.getAllFans(userid);
for(String fan : fans){
// Add email to email list
blogMapper.addEmail(new Date().getTime()+"",userid,fan,blogRowKey);
}
}catch (Exception e){
e.printStackTrace();
}
}
public void addAttend(String userid, String attendid) {
try{
// If the attender's fan doesn't contain the user
if(!blogMapper.getAllFans(attendid).contains(userid)){
// Add the attend to the user
blogMapper.addRelation(new Date().getTime()+"",userid,attendid,"");
// Add the fan to the attender
blogMapper.addRelation(new Date().getTime()+"",attendid,"",userid);
// If they attend eacher other
if(blogMapper.getAllFans(userid).contains(attendid) && blogMapper.getAllFans(attendid).contains(userid)){
// Remover four relations
blogMapper.removeAttend(userid,attendid);
blogMapper.removeAttend(attendid,userid);
blogMapper.removeFan(userid,attendid);
blogMapper.removeFan(attendid,userid);
// Add attend and fan to each other
blogMapper.addRelation(new Date().getTime()+"",userid,attendid,attendid);
blogMapper.addRelation(new Date().getTime()+"",attendid,userid,userid);
}
}
// Get all blogs by the attendid
List<String> blogRowKeys = blogMapper.getBlogRowKeysByUserid(attendid);
for(String blogRowKey : blogRowKeys){
// Add email
blogMapper.addEmail(new Date().getTime()+"",attendid,userid,blogRowKey);
}
}catch (Exception e){
e.printStackTrace();
}
}
public void removeAttend(String userid, String attendid) {
try{
blogMapper.removeAttend(userid, attendid);
blogMapper.removeFan(attendid,userid);
blogMapper.removeEmails(attendid,userid);
}catch (Exception e){
e.printStackTrace();
}
}
public List<Blog> getBlogsByFanid(String userid){
List<Blog> blogs = new ArrayList<Blog>();
try{
// Get all attenders of the user
List<String> attends = blogMapper.getAllAttends(userid);
for(String attendid : attends){
// Get all blogRowKeys of the attender
List<String> attendBlogsRowKey = blogMapper.getBlogRowKeysByUserid(attendid);
for(String blogRowKey : attendBlogsRowKey){
// Add blogs of the attender to the list
blogs.add(blogMapper.getBlog(blogRowKey));
}
}
}catch (Exception e){
e.printStackTrace();
}
return blogs;
}
public User getUserById(String userid){
User user = null;
try{
user = blogMapper.getUserById(userid);
}catch (Exception e){
e.printStackTrace();
}
return user;
}
}
7 测试类TestAll
import dao.BlogMapper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import pojo.Blog;
import service.BlogService;
import java.io.IOException;
import java.util.List;
public class TestAll {
BlogService bs;
BlogMapper bm;
@Before
public void init(){
this.bs = new BlogService();
}
@Test
public void testCreateTable() throws IOException {
this.bm = new BlogMapper();
bm.createTable("user","info");
bm.createTable("blog","info");
bm.createTable("email","info");
bm.createTable("relation","info");
}
@Test
public void testPostBlog(){
bs.postBlog("100001","What a wonderful day!");
bs.postBlog("100001","I love Shenyang!");
bs.postBlog("100001","My math is really awful...");
bs.postBlog("100001","There is an exam tommorrow!");
bs.postBlog("100003","How wonderful!");
bs.postBlog("100003","I hate Shenyang!");
bs.postBlog("100004","My English is really awful...");
bs.postBlog("100005","There is an exam today!");
}
@Test
public void testAddAttend(){
bs.addAttend("100001","100002");
bs.addAttend("100002","100001");
bs.addAttend("100002","100003");
bs.addAttend("100002","100004");
bs.addAttend("100002","100005");
}
@Test
public void testRemoveAttend(){
bs.removeAttend("100001","100002");
}
@Test
public void testGetBlogByFanid(){
List<Blog> list = bs.getBlogsByFanid("100002");
for(Blog blog : list){
System.out.println("User. : "+ bs.getUserById(blog.getUserid()).getNickname());
System.out.println("Date. : "+blog.getDate());
System.out.println("Cont. : "+blog.getContent()+"\n---");
}
}
}
8 测试运行
首先开启HDFS和HBase:
start-dfs.sh
start-hbase.sh
运行testCreateTable方法,先建四个表。运行后控制台会有很多输出,最后会显示建表完成:
进入hbase的shell,使用list看一下:
四个空表已经创建完毕,下面注册几个新User:
put 'user','100001','info:nickname','Jack'
put 'user','100002','info:nickname','Mary'
put 'user','100003','info:nickname','Lucy'
put 'user','100004','info:nickname','John'
put 'user','100005','info:nickname','Tom'
put 'user','100001','info:password','123456'
put 'user','100002','info:password','123456'
put 'user','100003','info:password','123456'
put 'user','100004','info:password','123456'
put 'user','100005','info:password','123456'
按照顺序进行其余测试,输出和数据库会有更新。这里列出按顺序最后一个方法 testGetBlogByFanid 的输出:
测试完毕。