package hmr.hbase.first;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
public class hbApp {
/**
* 创建名字空间
* @param args
* @throws Exception
*/
@Test
public void createNS() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//得到管理程序
Admin admin=conn.getAdmin();
//名字空间描述符,相当于命令: create_namespace ns1,建了一个数据库名为ns1的数据库
NamespaceDescriptor nsd=NamespaceDescriptor.create("ns1").build();
admin.createNamespace(nsd);
System.out.println("over");
}
/**
* 创建表
* @throws Exception
*/
@Test
public void createTable() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//得到管理程序
Admin admin=conn.getAdmin();
//创建表明对象,相当于在ns1数据库下创建了一个t1表
TableName tabName=TableName.valueOf("ns1:t1");
HTableDescriptor tab=new HTableDescriptor(tabName);
//添加列族f1,每个表至少有一个列族,相当于命令: create 'ns1:t1','f1'
HColumnDescriptor colDesc=new HColumnDescriptor("f1");
tab.addFamily(colDesc);
//创建表
admin.createTable(tab);
System.out.println("over");
}
/**
* 创建表
* @throws Exception
*/
@Test
public void createTable2() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//得到管理程序
Admin admin=conn.getAdmin();
//创建表明对象,相当于在ns1数据库下创建了一个t1表
TableName tabName=TableName.valueOf("ns1:t1");
HTableDescriptor tab=new HTableDescriptor(tabName);
//添加列族f1,每个表至少有一个列族,相当于命令: create 'ns1:t1','f1'
HColumnDescriptor colDesc=new HColumnDescriptor("f1");
colDesc.setMaxVersions(5); //设置保留最大版本数
colDesc.setMinVersions(2); //设置保留最小版本数
colDesc.setTimeToLive(20); //默认forever,设置存活时间20秒
colDesc.setKeepDeletedCells(KeepDeletedCells.TTL); //和TTL时间保持一致
tab.addFamily(colDesc);
//创建表
admin.createTable(tab);
System.out.println("over");
}
/**
* 插入数据
* @throws Exception
*/
@Test
public void put() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//获得Table对象
Table table=conn.getTable(TableName.valueOf("ns1:t1"));
//将字符串转换成byte[],row1表示行
byte[] rowkey=Bytes.toBytes("row1");
Put put=new Put(rowkey);
//相当于put 'ns1:t1','row1','f1:id',1
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(1));
//相当于put 'ns1:t1','row1','f1:name','JIM'
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("JIM"));
//相当于put 'ns1:t1','row1','f1:age',18
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(18));
table.put(put);
table.close();
System.out.println("over");
}
/**
* 删除数据
*/
@Test
public void del() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//获得Table对象
Table table=conn.getTable(TableName.valueOf("ns1:t1"));
Delete del = new Delete(Bytes.toBytes("row1"));
del.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"));
//相当于命令: delete 'ns1:t1','row1','f1:id'
table.delete(del);
System.out.println("ok");
}
/**
* scan数据
*/
@Test
public void scan() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//获得table对象
Table table=conn.getTable(TableName.valueOf("ns1:t1"));
//创建扫描对象
Scan scan=new Scan();
//扫描器
ResultScanner scanner=table.getScanner(scan);
Iterator<Result> it=scanner.iterator();
while(it.hasNext()){
Result r=it.next();
//得到最新(版本最大的)的cell,NavigableMap:可导航map
NavigableMap<byte[],byte[]> fmap=r.getFamilyMap(Bytes.toBytes("f1"));
for(Entry<byte[],byte[]> entry : fmap.entrySet()){
String key=Bytes.toString(entry.getKey());
if(key.equals("name")){
String val = Bytes.toString(entry.getValue());
System.out.println(key+" = "+val);
}else{
int val=Bytes.toInt(entry.getValue());
System.out.println(key+" = "+val);
}
}
scanner.close();
System.out.println("ok");
}
}
/**
* scan数据
*/
@Test
public void scan1() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//获得table对象
Table table=conn.getTable(TableName.valueOf("ns1:t1"));
//创建扫描对象
Scan scan=new Scan();
//扫描器
ResultScanner scanner=table.getScanner(scan);
Iterator<Result> it=scanner.iterator();
while(it.hasNext()){
Result r=it.next();
//得到最新(版本最大的)的cell,NavigableMap:可导航map
NavigableMap<byte[],byte[]> fmap=r.getFamilyMap(Bytes.toBytes("f1"));
for(Entry<byte[],byte[]> entry : fmap.entrySet()){
//得到该列族下的所有列
List<Cell> cells=r.getColumnCells(Bytes.toBytes("f1"), entry.getKey());
for(Cell cell : cells){
String row=Bytes.toString(cell.getRow());
String f = Bytes.toString(cell.getFamily());
String col=Bytes.toString(cell.getQualifier());
long ts=cell.getTimestamp();
System.out.println(row + "/"+ f+":"+col+"/"+ts);
}
}
}
scanner.close();
System.out.println("ok");
}
/**
* get
*/
@Test
public void get() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//获得table对象
Table table=conn.getTable(TableName.valueOf("ns1:t1"));
Get get=new Get(Bytes.toBytes("row1"));
get.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"));
//设置时间戳,精确匹配
//get.setTimeStamp(1547084124592L);
get.setTimeRange(1547066432623L,1547085490874L);
Result r=table.get(get);
List<Cell> cells=r.getColumnCells(Bytes.toBytes("f1"), Bytes.toBytes("name"));
for(Cell c : cells){
System.out.println(Bytes.toString(c.getValue()));
}
}
/**
* scan raw
*/
@Test
public void rawScan() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//获得table对象
Table table=conn.getTable(TableName.valueOf("ns1:t1"));
Scan scan=new Scan();
//扫描指定特定列
scan.addFamily(Bytes.toBytes("f1"));
scan.setRaw(true);
//设置版本数
scan.setMaxVersions(7);
scan.setTimeRange(1547066432623L,1547090486096L);
ResultScanner rs=table.getScanner(scan);
Iterator<Result> it =rs.iterator();
while(it.hasNext()){
Result r=it.next();
List<Cell> cells=r.getColumnCells(Bytes.toBytes("f1"), Bytes.toBytes("name"));
for(Cell c : cells){
System.out.println(Bytes.toString(c.getValue()));
}
}
}
/**
* 批量插入数据
* @throws Exception
*/
@Test
public void putBatch() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//获得Table对象
Table table=conn.getTable(TableName.valueOf("ns1:t1"));
List<Put> puts = new ArrayList<Put>();
Put p=null;
long start=System.currentTimeMillis();
for(int i=1;i<=1000000;i++){
p=new Put(Bytes.toBytes("row"+i));
p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom"+i));
puts.add(p);
}
table.put(puts);
System.out.println(System.currentTimeMillis()-start);
}
/**
* 通过扫描器缓存实现查询
* @throws Exception
*/
@Test
public void scannerCatch() throws Exception{
//创建hbase配置对象
Configuration conf=HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn=ConnectionFactory.createConnection(conf);
//获得Table对象
Table table=conn.getTable(TableName.valueOf("ns2:t1"));
Scan scan=new Scan();
scan.setStartRow(Bytes.toBytes("row1"));
scan.setStopRow(Bytes.toBytes("row50000"));
scan.setCaching(10000);
scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"));
ResultScanner s=table.getScanner(scan);
Iterator<Result> it=s.iterator();
long start=System.currentTimeMillis();
while(it.hasNext()){
Result r=it.next();
String str=Bytes.toString(r.getColumnLatestCell(Bytes.toBytes("f1"),Bytes.toBytes("name")).getValue());
System.out.println(str);
}
System.out.println(System.currentTimeMillis()-start);
}
}