HBase常用操作代码实现

package hmr.hbase.first;


import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

public class hbApp {
    /**
     * 创建名字空间
     * @param args
     * @throws Exception
     */
    @Test
    public void createNS() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //得到管理程序
        Admin admin=conn.getAdmin();
        //名字空间描述符,相当于命令: create_namespace ns1,建了一个数据库名为ns1的数据库
        NamespaceDescriptor nsd=NamespaceDescriptor.create("ns1").build();
        admin.createNamespace(nsd);
        System.out.println("over");
    }
    
    /**
     * 创建表
     * @throws Exception 
     */
    @Test
    public void createTable() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //得到管理程序
        Admin admin=conn.getAdmin();
        //创建表明对象,相当于在ns1数据库下创建了一个t1表
        TableName tabName=TableName.valueOf("ns1:t1");
        HTableDescriptor tab=new HTableDescriptor(tabName);
        //添加列族f1,每个表至少有一个列族,相当于命令: create 'ns1:t1','f1'
        HColumnDescriptor colDesc=new HColumnDescriptor("f1");
        tab.addFamily(colDesc);
        //创建表
        admin.createTable(tab);
        System.out.println("over");
    }
    
    /**
     * 创建表
     * @throws Exception 
     */
    @Test
    public void createTable2() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //得到管理程序
        Admin admin=conn.getAdmin();
        //创建表明对象,相当于在ns1数据库下创建了一个t1表
        TableName tabName=TableName.valueOf("ns1:t1");
        HTableDescriptor tab=new HTableDescriptor(tabName);
        //添加列族f1,每个表至少有一个列族,相当于命令: create 'ns1:t1','f1'
        HColumnDescriptor colDesc=new HColumnDescriptor("f1");
        colDesc.setMaxVersions(5);     //设置保留最大版本数
        colDesc.setMinVersions(2);     //设置保留最小版本数
        colDesc.setTimeToLive(20);    //默认forever,设置存活时间20秒
        colDesc.setKeepDeletedCells(KeepDeletedCells.TTL);   //和TTL时间保持一致
        tab.addFamily(colDesc);
        //创建表
        admin.createTable(tab);
        System.out.println("over");
    }
    
    /**
     * 插入数据
     * @throws Exception 
     */
    @Test
    public void put() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //获得Table对象
        Table table=conn.getTable(TableName.valueOf("ns1:t1"));
        //将字符串转换成byte[],row1表示行
        byte[] rowkey=Bytes.toBytes("row1");
        Put put=new Put(rowkey);
        //相当于put 'ns1:t1','row1','f1:id',1
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(1));
        //相当于put 'ns1:t1','row1','f1:name','JIM'
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("JIM"));
        //相当于put 'ns1:t1','row1','f1:age',18
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(18));
        table.put(put);
        table.close();
        System.out.println("over");
        
    }
    
    /**
     * 删除数据
     */
    @Test
    public void del() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //获得Table对象
        Table table=conn.getTable(TableName.valueOf("ns1:t1"));
        Delete del = new Delete(Bytes.toBytes("row1"));
        del.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"));
        //相当于命令:  delete 'ns1:t1','row1','f1:id'
        table.delete(del);
        System.out.println("ok");
        
    }
    
    /**
     * scan数据
     */
    @Test
    public void scan() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //获得table对象
        Table table=conn.getTable(TableName.valueOf("ns1:t1"));
        //创建扫描对象
        Scan scan=new Scan();
        //扫描器
        ResultScanner scanner=table.getScanner(scan);
        Iterator<Result> it=scanner.iterator();
        while(it.hasNext()){
        Result r=it.next();
        //得到最新(版本最大的)的cell,NavigableMap:可导航map
        NavigableMap<byte[],byte[]> fmap=r.getFamilyMap(Bytes.toBytes("f1"));
        for(Entry<byte[],byte[]> entry : fmap.entrySet()){
            String key=Bytes.toString(entry.getKey());
            if(key.equals("name")){
            String val = Bytes.toString(entry.getValue());
            System.out.println(key+" = "+val);
            }else{
                int val=Bytes.toInt(entry.getValue());
                System.out.println(key+" = "+val);
            }            
        }
        scanner.close();
        System.out.println("ok");        
    }
        }
    
    /**
     * scan数据
     */
    @Test
    public void scan1() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //获得table对象
        Table table=conn.getTable(TableName.valueOf("ns1:t1"));
        //创建扫描对象
        Scan scan=new Scan();
        //扫描器
        ResultScanner scanner=table.getScanner(scan);
        Iterator<Result> it=scanner.iterator();
        while(it.hasNext()){
        Result r=it.next();
        //得到最新(版本最大的)的cell,NavigableMap:可导航map
        NavigableMap<byte[],byte[]> fmap=r.getFamilyMap(Bytes.toBytes("f1"));
        for(Entry<byte[],byte[]> entry : fmap.entrySet()){
            //得到该列族下的所有列
            List<Cell> cells=r.getColumnCells(Bytes.toBytes("f1"), entry.getKey());
            for(Cell cell : cells){
                String row=Bytes.toString(cell.getRow());
                String f = Bytes.toString(cell.getFamily());
                String col=Bytes.toString(cell.getQualifier());
                long ts=cell.getTimestamp();
                System.out.println(row + "/"+ f+":"+col+"/"+ts);
            }                  
        }
    }
        scanner.close();
        System.out.println("ok");
        }
    
    /**
     * get
     */
    @Test
    public void get() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //获得table对象
        Table table=conn.getTable(TableName.valueOf("ns1:t1"));
        Get get=new Get(Bytes.toBytes("row1"));
        get.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"));
        //设置时间戳,精确匹配
        //get.setTimeStamp(1547084124592L);
        get.setTimeRange(1547066432623L,1547085490874L);
        Result r=table.get(get);
        List<Cell> cells=r.getColumnCells(Bytes.toBytes("f1"), Bytes.toBytes("name"));
        for(Cell c : cells){
            System.out.println(Bytes.toString(c.getValue()));
        }
        }
    
    /**
     * scan raw
     */
    @Test
    public void rawScan() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //获得table对象
        Table table=conn.getTable(TableName.valueOf("ns1:t1"));
        Scan scan=new Scan();
        //扫描指定特定列
        scan.addFamily(Bytes.toBytes("f1"));
        scan.setRaw(true);
        //设置版本数
        scan.setMaxVersions(7);
        scan.setTimeRange(1547066432623L,1547090486096L);
        ResultScanner rs=table.getScanner(scan);
        Iterator<Result> it =rs.iterator();
        while(it.hasNext()){
            Result r=it.next();
            List<Cell> cells=r.getColumnCells(Bytes.toBytes("f1"), Bytes.toBytes("name"));
            for(Cell c : cells){
                System.out.println(Bytes.toString(c.getValue()));
            }
        }
        } 

    /**
     * 批量插入数据
     * @throws Exception 
     */
    @Test
    public void putBatch() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //获得Table对象
        Table table=conn.getTable(TableName.valueOf("ns1:t1"));
    
        List<Put> puts = new ArrayList<Put>();
        Put p=null;
        long start=System.currentTimeMillis();
        for(int i=1;i<=1000000;i++){
            p=new Put(Bytes.toBytes("row"+i));
            p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom"+i));
            puts.add(p);
        }
        table.put(puts);
        System.out.println(System.currentTimeMillis()-start);
    }   

    /**
     * 通过扫描器缓存实现查询
     * @throws Exception 
     */
    @Test
    public void scannerCatch() throws Exception{
        //创建hbase配置对象
        Configuration conf=HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn=ConnectionFactory.createConnection(conf);
        //获得Table对象
        Table table=conn.getTable(TableName.valueOf("ns2:t1"));
        Scan scan=new Scan();
        scan.setStartRow(Bytes.toBytes("row1"));
        scan.setStopRow(Bytes.toBytes("row50000"));
        scan.setCaching(10000);
        scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"));
        ResultScanner s=table.getScanner(scan);
        Iterator<Result> it=s.iterator();
        long start=System.currentTimeMillis();
        while(it.hasNext()){
            Result r=it.next();
            String str=Bytes.toString(r.getColumnLatestCell(Bytes.toBytes("f1"),Bytes.toBytes("name")).getValue());
            System.out.println(str);
        }
        System.out.println(System.currentTimeMillis()-start);
    }    
}

猜你喜欢

转载自blog.csdn.net/nengyu/article/details/85786438