sparksql查询hbase中的数据

1.demo环境说明

    demo是根据spark2.2+hadoop2.7+hbase1.3(亲测版本兼容,其他不同版本可能会出现不兼容的情况)

2.代码案例

package com.yzc.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;


public class SparkOnHbase {

    public static void main(String[] args) throws Exception {

        System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        SparkSession spark=SparkSession.builder()
                .appName("lcc_java_read_hbase_register_to_table")
                .master("local[*]")
                .getOrCreate();

        JavaSparkContext context = new JavaSparkContext(spark.sparkContext());

        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "192.168.18.228,192.168.18.229,192.168.18.232");
        configuration.set("hbase.zookeeper.property.clientPort","2181");
        configuration.set("hbase.master", "192.168.18.228:60000");

        Scan scan = new Scan();
        String tableName = "tableTest";
        configuration.set(TableInputFormat.INPUT_TABLE, tableName);

        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String ScanToString = Base64.encodeBytes(proto.toByteArray());
        configuration.set(TableInputFormat.SCAN, ScanToString);

        JavaPairRDD<ImmutableBytesWritable, Result> myRDD = context.newAPIHadoopRDD(configuration,TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

        JavaRDD<Row> personsRDD = myRDD.map(new Function<Tuple2<ImmutableBytesWritable,Result>,Row>() {

            @Override
            public Row call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("====tuple=========="+tuple);
                Result result = tuple._2();
                String rowkey = Bytes.toString(result.getRow());
                String id = Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("id")));
                String name = Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name")));
                String password = Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("password")));

                //这一点可以直接转化为row类型
                return RowFactory.create(rowkey,id,name,password);
            }

        });

        List<StructField> structFields=new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("rowkey", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("id", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("password", DataTypes.StringType, true));
//        structFields.add(DataTypes.createStructField("age", DataTypes.StringType, true));

        StructType schema=DataTypes.createStructType(structFields);

        Dataset stuDf=spark.createDataFrame(personsRDD, schema);
        //stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par");
        stuDf.printSchema();
        stuDf.createOrReplaceTempView("Person");
        Dataset<Row> nameDf=spark.sql("select * from Person ");
        nameDf.show();

    }

}

猜你喜欢

转载自blog.csdn.net/u013164612/article/details/80498254